diff options
-rw-r--r-- | .cargo_vcs_info.json | 6 | ||||
-rw-r--r-- | CHANGELOG.md | 81 | ||||
-rw-r--r-- | Cargo.lock | 443 | ||||
-rw-r--r-- | Cargo.toml | 56 | ||||
-rw-r--r-- | Cargo.toml.orig | 31 | ||||
-rw-r--r-- | LICENSE-APACHE | 201 | ||||
-rw-r--r-- | LICENSE-MIT | 23 | ||||
-rw-r--r-- | README.md | 69 | ||||
-rw-r--r-- | benches/spawn.rs | 22 | ||||
-rw-r--r-- | examples/spawn-local.rs | 73 | ||||
-rw-r--r-- | examples/spawn-on-thread.rs | 53 | ||||
-rw-r--r-- | examples/spawn.rs | 48 | ||||
-rw-r--r-- | src/header.rs | 162 | ||||
-rw-r--r-- | src/lib.rs | 99 | ||||
-rw-r--r-- | src/raw.rs | 707 | ||||
-rw-r--r-- | src/runnable.rs | 398 | ||||
-rw-r--r-- | src/state.rs | 69 | ||||
-rw-r--r-- | src/task.rs | 532 | ||||
-rw-r--r-- | src/utils.rs | 127 | ||||
-rw-r--r-- | tests/basic.rs | 299 | ||||
-rw-r--r-- | tests/cancel.rs | 183 | ||||
-rw-r--r-- | tests/join.rs | 386 | ||||
-rw-r--r-- | tests/panic.rs | 234 | ||||
-rw-r--r-- | tests/ready.rs | 225 | ||||
-rw-r--r-- | tests/waker_panic.rs | 330 | ||||
-rw-r--r-- | tests/waker_pending.rs | 365 | ||||
-rw-r--r-- | tests/waker_ready.rs | 278 |
27 files changed, 5500 insertions, 0 deletions
diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json new file mode 100644 index 0000000..e80a435 --- /dev/null +++ b/.cargo_vcs_info.json @@ -0,0 +1,6 @@ +{ + "git": { + "sha1": "f910d25edb04d05a24c9e58d73a4e5d8a31163a6" + }, + "path_in_vcs": "" +}
\ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d0a3e0b --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,81 @@ +# Version 4.3.0 + +- Bump MSRV to Rust 1.47. (#30) +- Evaluate the layouts for the tasks at compile time. (#30) +- Add layout_info field to TaskVTable so that debuggers can decode raw tasks. (#29) + +# Version 4.2.0 + +- Add `Task::is_finished`. (#19) + +# Version 4.1.0 + +- Add `FallibleTask`. (#21) + +# Version 4.0.3 + +- Document the return value of `Runnable::run()` better. + +# Version 4.0.2 + +- Nits in the docs. + +# Version 4.0.1 + +- Nits in the docs. + +# Version 4.0.0 + +- Rename `Task` to `Runnable`. +- Rename `JoinHandle` to `Task`. +- Cancel `Task` on drop. +- Add `Task::detach()` and `Task::cancel()`. +- Add `spawn_unchecked()`. + +# Version 3.0.0 + +- Use `ThreadId` in `spawn_local` because OS-provided IDs can get recycled. +- Add `std` feature to `Cargo.toml`. + +# Version 2.1.1 + +- Allocate large futures on the heap. + +# Version 2.1.0 + +- `JoinHandle` now only evaluates after the task's future has been dropped. + +# Version 2.0.0 + +- Return `true` in `Task::run()`. + +# Version 1.3.1 + +- Make `spawn_local` available only on unix and windows. + +# Version 1.3.0 + +- Add `waker_fn`. + +# Version 1.2.1 + +- Add the `no-std` category to the package. + +# Version 1.2.0 + +- The crate is now marked with `#![no_std]`. +- Add `Task::waker` and `JoinHandle::waker`. +- Add `Task::into_raw` and `Task::from_raw`. + +# Version 1.1.1 + +- Fix a use-after-free bug where the schedule function is dropped while running. + +# Version 1.1.0 + +- If a task is dropped or canceled outside the `run` method, it gets re-scheduled. +- Add `spawn_local` constructor. + +# Version 1.0.0 + +- Initial release diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..388f5ba --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,443 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "async-channel" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2114d64672151c0c5eaa5e131ec84a74f06e1e559830dabba01ca30605d66319" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "871f9bb5e0a22eeb7e8cf16641feb87c9dc67032ccf8ff49e772eb9941d3a965" +dependencies = [ + "async-task 4.2.0", + "concurrent-queue", + "fastrand", + "futures-lite", + "once_cell", + "slab", +] + +[[package]] +name = "async-fs" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b3ca4f8ff117c37c278a2f7415ce9be55560b846b5bc4412aaa5d29c1c3dae2" +dependencies = [ + "async-lock", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-io" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5e18f61464ae81cde0a23e713ae8fd299580c54d697a35820cfd0625b8b0e07" +dependencies = [ + "concurrent-queue", + "futures-lite", + "libc", + "log", + "once_cell", + "parking", + "polling", + "slab", + "socket2", + "waker-fn", + "winapi", +] + +[[package]] +name = "async-lock" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e97a171d191782fba31bb902b14ad94e24a68145032b7eedf871ab0bc0d077b6" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-net" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5373304df79b9b4395068fb080369ec7178608827306ce4d081cba51cac551df" +dependencies = [ + "async-io", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-process" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf2c06e30a24e8c78a3987d07f0930edf76ef35e027e7bdb063fccafdad1f60c" +dependencies = [ + "async-io", + "blocking", + "cfg-if", + "event-listener", + "futures-lite", + "libc", + "once_cell", + "signal-hook", + "winapi", +] + +[[package]] +name = "async-task" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" + +[[package]] +name = "async-task" +version = "4.3.0" +dependencies = [ + "atomic-waker", + "easy-parallel", + "flaky_test", + "flume", + "once_cell", + "smol", +] + +[[package]] +name = "atomic-waker" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" + +[[package]] +name = "autocfg" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" + +[[package]] +name = "blocking" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6ccb65d468978a086b69884437ded69a90faab3bbe6e67f242173ea728acccc" +dependencies = [ + "async-channel", + "async-task 4.2.0", + "atomic-waker", + "fastrand", + "futures-lite", + "once_cell", +] + +[[package]] +name = "cache-padded" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1db59621ec70f09c5e9b597b220c7a2b43611f4710dc03ceb8748637775692c" + +[[package]] +name = "cc" +version = "1.0.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + +[[package]] +name = "easy-parallel" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6907e25393cdcc1f4f3f513d9aac1e840eb1cc341a0fccb01171f7d14d10b946" + +[[package]] +name = "event-listener" +version = "2.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" + +[[package]] +name = "fastrand" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf" +dependencies = [ + "instant", +] + +[[package]] +name = "flaky_test" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479cde5eb168cf5a056dd98f311cbfab7494c216394e4fb9eba0336827a8db93" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "flume" +version = "0.10.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ceeb589a3157cac0ab8cc585feb749bd2cea5cb55a6ee802ad72d9fd38303da" +dependencies = [ + "spin", +] + +[[package]] +name = "futures-core" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" + +[[package]] +name = "futures-io" +version = "0.3.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" + +[[package]] +name = "futures-lite" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "libc" +version = "0.2.126" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" + +[[package]] +name = "lock_api" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "memchr" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" + +[[package]] +name = "once_cell" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225" + +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + +[[package]] +name = "pin-project-lite" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" + +[[package]] +name = "polling" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "685404d509889fade3e86fe3a5803bca2ec09b0c0778d5ada6ec8bf7a8de5259" +dependencies = [ + "cfg-if", + "libc", + "log", + "wepoll-ffi", + "winapi", +] + +[[package]] +name = "proc-macro2" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd96a1e8ed2596c337f8eae5f24924ec83f5ad5ab21ea8e455d3566c69fbcaf7" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bcdf212e9776fbcb2d23ab029360416bb1706b1aea2d1a5ba002727cbcab804" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + +[[package]] +name = "signal-hook" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a253b5e89e2698464fc26b545c9edceb338e18a89effeeecfea192c3025be29d" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" + +[[package]] +name = "smol" +version = "1.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85cf3b5351f3e783c1d79ab5fc604eeed8b8ae9abd36b166e8b87a089efd85e4" +dependencies = [ + "async-channel", + "async-executor", + "async-fs", + "async-io", + "async-lock", + "async-net", + "async-process", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "socket2" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "spin" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" +dependencies = [ + "lock_api", +] + +[[package]] +name = "syn" +version = "1.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c50aef8a904de4c23c788f104b7dddc7d6f79c647c7c8ce4cc8f73eb0ca773dd" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" + +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + +[[package]] +name = "wepoll-ffi" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d743fdedc5c64377b5fc2bc036b01c7fd642205a0d96356034ae3404d49eb7fb" +dependencies = [ + "cc", +] + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..6e989e5 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,56 @@ +# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO +# +# When uploading crates to the registry Cargo will automatically +# "normalize" Cargo.toml files for maximal compatibility +# with all versions of Cargo and also rewrite `path` dependencies +# to registry (e.g., crates.io) dependencies. +# +# If you are reading this file be aware that the original Cargo.toml +# will likely look very different (and much more reasonable). +# See Cargo.toml.orig for the original contents. + +[package] +edition = "2018" +rust-version = "1.47" +name = "async-task" +version = "4.3.0" +authors = ["Stjepan Glavina <stjepang@gmail.com>"] +exclude = ["/.*"] +description = "Task abstraction for building executors" +readme = "README.md" +keywords = [ + "futures", + "task", + "executor", + "spawn", +] +categories = [ + "asynchronous", + "concurrency", + "no-std", +] +license = "Apache-2.0 OR MIT" +repository = "https://github.com/smol-rs/async-task" + +[dev-dependencies.atomic-waker] +version = "1" + +[dev-dependencies.easy-parallel] +version = "3" + +[dev-dependencies.flaky_test] +version = "0.1" + +[dev-dependencies.flume] +version = "0.10" +default-features = false + +[dev-dependencies.once_cell] +version = "1" + +[dev-dependencies.smol] +version = "1" + +[features] +default = ["std"] +std = [] diff --git a/Cargo.toml.orig b/Cargo.toml.orig new file mode 100644 index 0000000..8c611bf --- /dev/null +++ b/Cargo.toml.orig @@ -0,0 +1,31 @@ +[package] +name = "async-task" +# When publishing a new version: +# - Update CHANGELOG.md +# - Create "v4.x.y" git tag +version = "4.3.0" +authors = ["Stjepan Glavina <stjepang@gmail.com>"] +edition = "2018" +rust-version = "1.47" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/smol-rs/async-task" +description = "Task abstraction for building executors" +keywords = ["futures", "task", "executor", "spawn"] +categories = ["asynchronous", "concurrency", "no-std"] +exclude = ["/.*"] + +[features] +default = ["std"] +std = [] + +[dev-dependencies] +atomic-waker = "1" +easy-parallel = "3" +flaky_test = "0.1" +flume = { version = "0.10", default-features = false } +once_cell = "1" +smol = "1" + +# rewrite dependencies to use the this version of async-task when running tests +[patch.crates-io] +async-task = { path = "." } diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..7044c9d --- /dev/null +++ b/README.md @@ -0,0 +1,69 @@ +# async-task + +[![Build](https://github.com/smol-rs/async-task/workflows/Build%20and%20test/badge.svg)]( +https://github.com/smol-rs/async-task/actions) +[![License](https://img.shields.io/badge/license-Apache--2.0_OR_MIT-blue.svg)]( +https://github.com/smol-rs/async-task) +[![Cargo](https://img.shields.io/crates/v/async-task.svg)]( +https://crates.io/crates/async-task) +[![Documentation](https://docs.rs/async-task/badge.svg)]( +https://docs.rs/async-task) + +Task abstraction for building executors. + +To spawn a future onto an executor, we first need to allocate it on the heap and keep some +state attached to it. The state indicates whether the future is ready for polling, waiting to +be woken up, or completed. Such a stateful future is called a *task*. + +All executors have a queue that holds scheduled tasks: + +```rust +let (sender, receiver) = flume::unbounded(); +``` + +A task is created using either `spawn()`, `spawn_local()`, or `spawn_unchecked()` which +return a `Runnable` and a `Task`: + +```rust +// A future that will be spawned. +let future = async { 1 + 2 }; + +// A function that schedules the task when it gets woken up. +let schedule = move |runnable| sender.send(runnable).unwrap(); + +// Construct a task. +let (runnable, task) = async_task::spawn(future, schedule); + +// Push the task into the queue by invoking its schedule function. +runnable.schedule(); +``` + +The `Runnable` is used to poll the task's future, and the `Task` is used to await its +output. + +Finally, we need a loop that takes scheduled tasks from the queue and runs them: + +```rust +for runnable in receiver { + runnable.run(); +} +``` + +Method `run()` polls the task's future once. Then, the `Runnable` +vanishes and only reappears when its `Waker` wakes the task, thus +scheduling it to be run again. + +## License + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +#### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/benches/spawn.rs b/benches/spawn.rs new file mode 100644 index 0000000..75d059e --- /dev/null +++ b/benches/spawn.rs @@ -0,0 +1,22 @@ +#![feature(test)] + +extern crate test; + +use smol::future; +use test::Bencher; + +#[bench] +fn task_create(b: &mut Bencher) { + b.iter(|| { + let _ = async_task::spawn(async {}, drop); + }); +} + +#[bench] +fn task_run(b: &mut Bencher) { + b.iter(|| { + let (runnable, task) = async_task::spawn(async {}, drop); + runnable.run(); + future::block_on(task); + }); +} diff --git a/examples/spawn-local.rs b/examples/spawn-local.rs new file mode 100644 index 0000000..a9da1b4 --- /dev/null +++ b/examples/spawn-local.rs @@ -0,0 +1,73 @@ +//! A simple single-threaded executor that can spawn non-`Send` futures. + +use std::cell::Cell; +use std::future::Future; +use std::rc::Rc; + +use async_task::{Runnable, Task}; + +thread_local! { + // A queue that holds scheduled tasks. + static QUEUE: (flume::Sender<Runnable>, flume::Receiver<Runnable>) = flume::unbounded(); +} + +/// Spawns a future on the executor. +fn spawn<F, T>(future: F) -> Task<T> +where + F: Future<Output = T> + 'static, + T: 'static, +{ + // Create a task that is scheduled by pushing itself into the queue. + let schedule = |runnable| QUEUE.with(|(s, _)| s.send(runnable).unwrap()); + let (runnable, task) = async_task::spawn_local(future, schedule); + + // Schedule the task by pushing it into the queue. + runnable.schedule(); + + task +} + +/// Runs a future to completion. +fn run<F, T>(future: F) -> T +where + F: Future<Output = T> + 'static, + T: 'static, +{ + // Spawn a task that sends its result through a channel. + let (s, r) = flume::unbounded(); + spawn(async move { drop(s.send(future.await)) }).detach(); + + loop { + // If the original task has completed, return its result. + if let Ok(val) = r.try_recv() { + return val; + } + + // Otherwise, take a task from the queue and run it. + QUEUE.with(|(_, r)| r.recv().unwrap().run()); + } +} + +fn main() { + let val = Rc::new(Cell::new(0)); + + // Run a future that increments a non-`Send` value. + run({ + let val = val.clone(); + async move { + // Spawn a future that increments the value. + let task = spawn({ + let val = val.clone(); + async move { + val.set(dbg!(val.get()) + 1); + } + }); + + val.set(dbg!(val.get()) + 1); + task.await; + } + }); + + // The value should be 2 at the end of the program. + dbg!(val.get()); +} diff --git a/examples/spawn-on-thread.rs b/examples/spawn-on-thread.rs new file mode 100644 index 0000000..b0ec2f2 --- /dev/null +++ b/examples/spawn-on-thread.rs @@ -0,0 +1,53 @@ +//! A function that runs a future to completion on a dedicated thread. + +use std::future::Future; +use std::sync::Arc; +use std::thread; + +use async_task::Task; +use smol::future; + +/// Spawns a future on a new dedicated thread. +/// +/// The returned task can be used to await the output of the future. +fn spawn_on_thread<F, T>(future: F) -> Task<T> +where + F: Future<Output = T> + Send + 'static, + T: Send + 'static, +{ + // Create a channel that holds the task when it is scheduled for running. + let (sender, receiver) = flume::unbounded(); + let sender = Arc::new(sender); + let s = Arc::downgrade(&sender); + + // Wrap the future into one that disconnects the channel on completion. + let future = async move { + // When the inner future completes, the sender gets dropped and disconnects the channel. + let _sender = sender; + future.await + }; + + // Create a task that is scheduled by sending it into the channel. + let schedule = move |runnable| s.upgrade().unwrap().send(runnable).unwrap(); + let (runnable, task) = async_task::spawn(future, schedule); + + // Schedule the task by sending it into the channel. + runnable.schedule(); + + // Spawn a thread running the task to completion. + thread::spawn(move || { + // Keep taking the task from the channel and running it until completion. + for runnable in receiver { + runnable.run(); + } + }); + + task +} + +fn main() { + // Spawn a future on a dedicated thread. + future::block_on(spawn_on_thread(async { + println!("Hello, world!"); + })); +} diff --git a/examples/spawn.rs b/examples/spawn.rs new file mode 100644 index 0000000..3a64811 --- /dev/null +++ b/examples/spawn.rs @@ -0,0 +1,48 @@ +//! A simple single-threaded executor. + +use std::future::Future; +use std::panic::catch_unwind; +use std::thread; + +use async_task::{Runnable, Task}; +use once_cell::sync::Lazy; +use smol::future; + +/// Spawns a future on the executor. +fn spawn<F, T>(future: F) -> Task<T> +where + F: Future<Output = T> + Send + 'static, + T: Send + 'static, +{ + // A queue that holds scheduled tasks. + static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| { + let (sender, receiver) = flume::unbounded::<Runnable>(); + + // Start the executor thread. + thread::spawn(|| { + for runnable in receiver { + // Ignore panics inside futures. + let _ignore_panic = catch_unwind(|| runnable.run()); + } + }); + + sender + }); + + // Create a task that is scheduled by pushing it into the queue. + let schedule = |runnable| QUEUE.send(runnable).unwrap(); + let (runnable, task) = async_task::spawn(future, schedule); + + // Schedule the task by pushing it into the queue. + runnable.schedule(); + + task +} + +fn main() { + // Spawn a future and await its result. + let task = spawn(async { + println!("Hello, world!"); + }); + future::block_on(task); +} diff --git a/src/header.rs b/src/header.rs new file mode 100644 index 0000000..8a3a0b9 --- /dev/null +++ b/src/header.rs @@ -0,0 +1,162 @@ +use core::cell::UnsafeCell; +use core::fmt; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::task::Waker; + +use crate::raw::TaskVTable; +use crate::state::*; +use crate::utils::abort_on_panic; + +/// The header of a task. +/// +/// This header is stored in memory at the beginning of the heap-allocated task. +pub(crate) struct Header { + /// Current state of the task. + /// + /// Contains flags representing the current state and the reference count. + pub(crate) state: AtomicUsize, + + /// The task that is blocked on the `Task` handle. + /// + /// This waker needs to be woken up once the task completes or is closed. + pub(crate) awaiter: UnsafeCell<Option<Waker>>, + + /// The virtual table. + /// + /// In addition to the actual waker virtual table, it also contains pointers to several other + /// methods necessary for bookkeeping the heap-allocated task. + pub(crate) vtable: &'static TaskVTable, +} + +impl Header { + /// Notifies the awaiter blocked on this task. + /// + /// If the awaiter is the same as the current waker, it will not be notified. + #[inline] + pub(crate) fn notify(&self, current: Option<&Waker>) { + if let Some(w) = self.take(current) { + abort_on_panic(|| w.wake()); + } + } + + /// Takes the awaiter blocked on this task. + /// + /// If there is no awaiter or if it is the same as the current waker, returns `None`. + #[inline] + pub(crate) fn take(&self, current: Option<&Waker>) -> Option<Waker> { + // Set the bit indicating that the task is notifying its awaiter. + let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel); + + // If the task was not notifying or registering an awaiter... + if state & (NOTIFYING | REGISTERING) == 0 { + // Take the waker out. + let waker = unsafe { (*self.awaiter.get()).take() }; + + // Unset the bit indicating that the task is notifying its awaiter. + self.state + .fetch_and(!NOTIFYING & !AWAITER, Ordering::Release); + + // Finally, notify the waker if it's different from the current waker. + if let Some(w) = waker { + match current { + None => return Some(w), + Some(c) if !w.will_wake(c) => return Some(w), + Some(_) => abort_on_panic(|| drop(w)), + } + } + } + + None + } + + /// Registers a new awaiter blocked on this task. + /// + /// This method is called when `Task` is polled and it has not yet completed. + #[inline] + pub(crate) fn register(&self, waker: &Waker) { + // Load the state and synchronize with it. + let mut state = self.state.fetch_or(0, Ordering::Acquire); + + loop { + // There can't be two concurrent registrations because `Task` can only be polled + // by a unique pinned reference. + debug_assert!(state & REGISTERING == 0); + + // If we're in the notifying state at this moment, just wake and return without + // registering. + if state & NOTIFYING != 0 { + abort_on_panic(|| waker.wake_by_ref()); + return; + } + + // Mark the state to let other threads know we're registering a new awaiter. + match self.state.compare_exchange_weak( + state, + state | REGISTERING, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + state |= REGISTERING; + break; + } + Err(s) => state = s, + } + } + + // Put the waker into the awaiter field. + unsafe { + abort_on_panic(|| (*self.awaiter.get()) = Some(waker.clone())); + } + + // This variable will contain the newly registered waker if a notification comes in before + // we complete registration. + let mut waker = None; + + loop { + // If there was a notification, take the waker out of the awaiter field. + if state & NOTIFYING != 0 { + if let Some(w) = unsafe { (*self.awaiter.get()).take() } { + abort_on_panic(|| waker = Some(w)); + } + } + + // The new state is not being notified nor registered, but there might or might not be + // an awaiter depending on whether there was a concurrent notification. + let new = if waker.is_none() { + (state & !NOTIFYING & !REGISTERING) | AWAITER + } else { + state & !NOTIFYING & !REGISTERING & !AWAITER + }; + + match self + .state + .compare_exchange_weak(state, new, Ordering::AcqRel, Ordering::Acquire) + { + Ok(_) => break, + Err(s) => state = s, + } + } + + // If there was a notification during registration, wake the awaiter now. + if let Some(w) = waker { + abort_on_panic(|| w.wake()); + } + } +} + +impl fmt::Debug for Header { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let state = self.state.load(Ordering::SeqCst); + + f.debug_struct("Header") + .field("scheduled", &(state & SCHEDULED != 0)) + .field("running", &(state & RUNNING != 0)) + .field("completed", &(state & COMPLETED != 0)) + .field("closed", &(state & CLOSED != 0)) + .field("awaiter", &(state & AWAITER != 0)) + .field("task", &(state & TASK != 0)) + .field("ref_count", &(state / REFERENCE)) + .finish() + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..dd689ec --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,99 @@ +//! Task abstraction for building executors. +//! +//! To spawn a future onto an executor, we first need to allocate it on the heap and keep some +//! state attached to it. The state indicates whether the future is ready for polling, waiting to +//! be woken up, or completed. Such a stateful future is called a *task*. +//! +//! All executors have a queue that holds scheduled tasks: +//! +//! ``` +//! let (sender, receiver) = flume::unbounded(); +//! # +//! # // A future that will get spawned. +//! # let future = async { 1 + 2 }; +//! # +//! # // A function that schedules the task when it gets woken up. +//! # let schedule = move |runnable| sender.send(runnable).unwrap(); +//! # +//! # // Create a task. +//! # let (runnable, task) = async_task::spawn(future, schedule); +//! ``` +//! +//! A task is created using either [`spawn()`], [`spawn_local()`], or [`spawn_unchecked()`] which +//! return a [`Runnable`] and a [`Task`]: +//! +//! ``` +//! # let (sender, receiver) = flume::unbounded(); +//! # +//! // A future that will be spawned. +//! let future = async { 1 + 2 }; +//! +//! // A function that schedules the task when it gets woken up. +//! let schedule = move |runnable| sender.send(runnable).unwrap(); +//! +//! // Construct a task. +//! let (runnable, task) = async_task::spawn(future, schedule); +//! +//! // Push the task into the queue by invoking its schedule function. +//! runnable.schedule(); +//! ``` +//! +//! The [`Runnable`] is used to poll the task's future, and the [`Task`] is used to await its +//! output. +//! +//! Finally, we need a loop that takes scheduled tasks from the queue and runs them: +//! +//! ```no_run +//! # let (sender, receiver) = flume::unbounded(); +//! # +//! # // A future that will get spawned. +//! # let future = async { 1 + 2 }; +//! # +//! # // A function that schedules the task when it gets woken up. +//! # let schedule = move |runnable| sender.send(runnable).unwrap(); +//! # +//! # // Create a task. +//! # let (runnable, task) = async_task::spawn(future, schedule); +//! # +//! # // Push the task into the queue by invoking its schedule function. +//! # runnable.schedule(); +//! # +//! for runnable in receiver { +//! runnable.run(); +//! } +//! ``` +//! +//! Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] +//! vanishes and only reappears when its [`Waker`][`core::task::Waker`] wakes the task, thus +//! scheduling it to be run again. + +#![cfg_attr(not(feature = "std"), no_std)] +#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![doc(test(attr(deny(rust_2018_idioms, warnings))))] +#![doc(test(attr(allow(unused_extern_crates, unused_variables))))] + +extern crate alloc; + +/// We can't use `?` in const contexts yet, so this macro acts +/// as a workaround. +macro_rules! leap { + ($x: expr) => {{ + match ($x) { + Some(val) => val, + None => return None, + } + }}; +} + +mod header; +mod raw; +mod runnable; +mod state; +mod task; +mod utils; + +pub use crate::runnable::{spawn, spawn_unchecked, Runnable}; +pub use crate::task::{FallibleTask, Task}; + +#[cfg(feature = "std")] +pub use crate::runnable::spawn_local; diff --git a/src/raw.rs b/src/raw.rs new file mode 100644 index 0000000..bb031da --- /dev/null +++ b/src/raw.rs @@ -0,0 +1,707 @@ +use alloc::alloc::Layout as StdLayout; +use core::cell::UnsafeCell; +use core::future::Future; +use core::mem::{self, ManuallyDrop}; +use core::pin::Pin; +use core::ptr::NonNull; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +use crate::header::Header; +use crate::state::*; +use crate::utils::{abort, abort_on_panic, max, Layout}; +use crate::Runnable; + +/// The vtable for a task. +pub(crate) struct TaskVTable { + /// Schedules the task. + pub(crate) schedule: unsafe fn(*const ()), + + /// Drops the future inside the task. + pub(crate) drop_future: unsafe fn(*const ()), + + /// Returns a pointer to the output stored after completion. + pub(crate) get_output: unsafe fn(*const ()) -> *const (), + + /// Drops the task reference (`Runnable` or `Waker`). + pub(crate) drop_ref: unsafe fn(ptr: *const ()), + + /// Destroys the task. + pub(crate) destroy: unsafe fn(*const ()), + + /// Runs the task. + pub(crate) run: unsafe fn(*const ()) -> bool, + + /// Creates a new waker associated with the task. + pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker, + + /// The memory layout of the task. This information enables + /// debuggers to decode raw task memory blobs. Do not remove + /// the field, even if it appears to be unused. + #[allow(unused)] + pub(crate) layout_info: &'static Option<TaskLayout>, +} + +/// Memory layout of a task. +/// +/// This struct contains the following information: +/// +/// 1. How to allocate and deallocate the task. +/// 2. How to access the fields inside the task. +#[derive(Clone, Copy)] +pub(crate) struct TaskLayout { + /// Memory layout of the whole task. + pub(crate) layout: StdLayout, + + /// Offset into the task at which the schedule function is stored. + pub(crate) offset_s: usize, + + /// Offset into the task at which the future is stored. + pub(crate) offset_f: usize, + + /// Offset into the task at which the output is stored. + pub(crate) offset_r: usize, +} + +/// Raw pointers to the fields inside a task. +pub(crate) struct RawTask<F, T, S> { + /// The task header. + pub(crate) header: *const Header, + + /// The schedule function. + pub(crate) schedule: *const S, + + /// The future. + pub(crate) future: *mut F, + + /// The output of the future. + pub(crate) output: *mut T, +} + +impl<F, T, S> Copy for RawTask<F, T, S> {} + +impl<F, T, S> Clone for RawTask<F, T, S> { + fn clone(&self) -> Self { + *self + } +} + +impl<F, T, S> RawTask<F, T, S> { + const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout(); + + /// Computes the memory layout for a task. + #[inline] + const fn eval_task_layout() -> Option<TaskLayout> { + // Compute the layouts for `Header`, `S`, `F`, and `T`. + let layout_header = Layout::new::<Header>(); + let layout_s = Layout::new::<S>(); + let layout_f = Layout::new::<F>(); + let layout_r = Layout::new::<T>(); + + // Compute the layout for `union { F, T }`. + let size_union = max(layout_f.size(), layout_r.size()); + let align_union = max(layout_f.align(), layout_r.align()); + let layout_union = Layout::from_size_align(size_union, align_union); + + // Compute the layout for `Header` followed `S` and `union { F, T }`. + let layout = layout_header; + let (layout, offset_s) = leap!(layout.extend(layout_s)); + let (layout, offset_union) = leap!(layout.extend(layout_union)); + let offset_f = offset_union; + let offset_r = offset_union; + + Some(TaskLayout { + layout: unsafe { layout.into_std() }, + offset_s, + offset_f, + offset_r, + }) + } +} + +impl<F, T, S> RawTask<F, T, S> +where + F: Future<Output = T>, + S: Fn(Runnable), +{ + const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + Self::clone_waker, + Self::wake, + Self::wake_by_ref, + Self::drop_waker, + ); + + /// Allocates a task with the given `future` and `schedule` function. + /// + /// It is assumed that initially only the `Runnable` and the `Task` exist. + pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { + // Compute the layout of the task for allocation. Abort if the computation fails. + // + // n.b. notgull: task_layout now automatically aborts instead of panicking + let task_layout = Self::task_layout(); + + unsafe { + // Allocate enough space for the entire task. + let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { + None => abort(), + Some(p) => p, + }; + + let raw = Self::from_ptr(ptr.as_ptr()); + + // Write the header as the first field of the task. + (raw.header as *mut Header).write(Header { + state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), + awaiter: UnsafeCell::new(None), + vtable: &TaskVTable { + schedule: Self::schedule, + drop_future: Self::drop_future, + get_output: Self::get_output, + drop_ref: Self::drop_ref, + destroy: Self::destroy, + run: Self::run, + clone_waker: Self::clone_waker, + layout_info: &Self::TASK_LAYOUT, + }, + }); + + // Write the schedule function as the third field of the task. + (raw.schedule as *mut S).write(schedule); + + // Write the future as the fourth field of the task. + raw.future.write(future); + + ptr + } + } + + /// Creates a `RawTask` from a raw task pointer. + #[inline] + pub(crate) fn from_ptr(ptr: *const ()) -> Self { + let task_layout = Self::task_layout(); + let p = ptr as *const u8; + + unsafe { + Self { + header: p as *const Header, + schedule: p.add(task_layout.offset_s) as *const S, + future: p.add(task_layout.offset_f) as *mut F, + output: p.add(task_layout.offset_r) as *mut T, + } + } + } + + /// Returns the layout of the task. + #[inline] + fn task_layout() -> TaskLayout { + match Self::TASK_LAYOUT { + Some(tl) => tl, + None => abort(), + } + } + + /// Wakes a waker. + unsafe fn wake(ptr: *const ()) { + // This is just an optimization. If the schedule function has captured variables, then + // we'll do less reference counting if we wake the waker by reference and then drop it. + if mem::size_of::<S>() > 0 { + Self::wake_by_ref(ptr); + Self::drop_waker(ptr); + return; + } + + let raw = Self::from_ptr(ptr); + + let mut state = (*raw.header).state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken up. + if state & (COMPLETED | CLOSED) != 0 { + // Drop the waker. + Self::drop_waker(ptr); + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match (*raw.header).state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Drop the waker. + Self::drop_waker(ptr); + break; + } + Err(s) => state = s, + } + } else { + // Mark the task as scheduled. + match (*raw.header).state.compare_exchange_weak( + state, + state | SCHEDULED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not yet scheduled and isn't currently running, now is the + // time to schedule it. + if state & RUNNING == 0 { + // Schedule the task. + Self::schedule(ptr); + } else { + // Drop the waker. + Self::drop_waker(ptr); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Wakes a waker by reference. + unsafe fn wake_by_ref(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + let mut state = (*raw.header).state.load(Ordering::Acquire); + + loop { + // If the task is completed or closed, it can't be woken up. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // If the task is already scheduled, we just need to synchronize with the thread that + // will run the task by "publishing" our current view of the memory. + if state & SCHEDULED != 0 { + // Update the state without actually modifying it. + match (*raw.header).state.compare_exchange_weak( + state, + state, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } else { + // If the task is not running, we can schedule right away. + let new = if state & RUNNING == 0 { + (state | SCHEDULED) + REFERENCE + } else { + state | SCHEDULED + }; + + // Mark the task as scheduled. + match (*raw.header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not running, now is the time to schedule. + if state & RUNNING == 0 { + // If the reference count overflowed, abort. + if state > isize::max_value() as usize { + abort(); + } + + // Schedule the task. There is no need to call `Self::schedule(ptr)` + // because the schedule function cannot be destroyed while the waker is + // still alive. + let task = Runnable { + ptr: NonNull::new_unchecked(ptr as *mut ()), + }; + (*raw.schedule)(task); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Clones a waker. + unsafe fn clone_waker(ptr: *const ()) -> RawWaker { + let raw = Self::from_ptr(ptr); + + // Increment the reference count. With any kind of reference-counted data structure, + // relaxed ordering is appropriate when incrementing the counter. + let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); + + // If the reference count overflowed, abort. + if state > isize::max_value() as usize { + abort(); + } + + RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) + } + + /// Drops a waker. + /// + /// This function will decrement the reference count. If it drops down to zero, the associated + /// `Task` has been dropped too, and the task has not been completed, then it will get + /// scheduled one more time so that its future gets dropped by the executor. + #[inline] + unsafe fn drop_waker(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // Decrement the reference count. + let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `Task` has been dropped too, + // then we need to decide how to destroy the task. + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { + if new & (COMPLETED | CLOSED) == 0 { + // If the task was not completed nor closed, close it and schedule one more time so + // that its future gets dropped by the executor. + (*raw.header) + .state + .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); + Self::schedule(ptr); + } else { + // Otherwise, destroy the task right away. + Self::destroy(ptr); + } + } + } + + /// Drops a task reference (`Runnable` or `Waker`). + /// + /// This function will decrement the reference count. If it drops down to zero and the + /// associated `Task` handle has been dropped too, then the task gets destroyed. + #[inline] + unsafe fn drop_ref(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // Decrement the reference count. + let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; + + // If this was the last reference to the task and the `Task` has been dropped too, + // then destroy the task. + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { + Self::destroy(ptr); + } + } + + /// Schedules a task for running. + /// + /// This function doesn't modify the state of the task. It only passes the task reference to + /// its schedule function. + unsafe fn schedule(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // If the schedule function has captured variables, create a temporary waker that prevents + // the task from getting deallocated while the function is being invoked. + let _waker; + if mem::size_of::<S>() > 0 { + _waker = Waker::from_raw(Self::clone_waker(ptr)); + } + + let task = Runnable { + ptr: NonNull::new_unchecked(ptr as *mut ()), + }; + (*raw.schedule)(task); + } + + /// Drops the future inside a task. + #[inline] + unsafe fn drop_future(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + + // We need a safeguard against panics because the destructor can panic. + abort_on_panic(|| { + raw.future.drop_in_place(); + }) + } + + /// Returns a pointer to the output inside a task. + unsafe fn get_output(ptr: *const ()) -> *const () { + let raw = Self::from_ptr(ptr); + raw.output as *const () + } + + /// Cleans up task's resources and deallocates it. + /// + /// The schedule function will be dropped, and the task will then get deallocated. + /// The task must be closed before this function is called. + #[inline] + unsafe fn destroy(ptr: *const ()) { + let raw = Self::from_ptr(ptr); + let task_layout = Self::task_layout(); + + // We need a safeguard against panics because destructors can panic. + abort_on_panic(|| { + // Drop the schedule function. + (raw.schedule as *mut S).drop_in_place(); + }); + + // Finally, deallocate the memory reserved by the task. + alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); + } + + /// Runs a task. + /// + /// If polling its future panics, the task will be closed and the panic will be propagated into + /// the caller. + unsafe fn run(ptr: *const ()) -> bool { + let raw = Self::from_ptr(ptr); + + // Create a context from the raw task pointer and the vtable inside the its header. + let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); + let cx = &mut Context::from_waker(&waker); + + let mut state = (*raw.header).state.load(Ordering::Acquire); + + // Update the task's state before polling its future. + loop { + // If the task has already been closed, drop the task reference and return. + if state & CLOSED != 0 { + // Drop the future. + Self::drop_future(ptr); + + // Mark the task as unscheduled. + let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + Self::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + return false; + } + + // Mark the task as unscheduled and running. + match (*raw.header).state.compare_exchange_weak( + state, + (state & !SCHEDULED) | RUNNING, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Update the state because we're continuing with polling the future. + state = (state & !SCHEDULED) | RUNNING; + break; + } + Err(s) => state = s, + } + } + + // Poll the inner future, but surround it with a guard that closes the task in case polling + // panics. + let guard = Guard(raw); + let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx); + mem::forget(guard); + + match poll { + Poll::Ready(out) => { + // Replace the future with its output. + Self::drop_future(ptr); + raw.output.write(out); + + // The task is now completed. + loop { + // If the `Task` is dropped, we'll need to close it and drop the output. + let new = if state & TASK == 0 { + (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED + } else { + (state & !RUNNING & !SCHEDULED) | COMPLETED + }; + + // Mark the task as not running and completed. + match (*raw.header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the `Task` is dropped or if the task was closed while running, + // now it's time to drop the output. + if state & TASK == 0 || state & CLOSED != 0 { + // Drop the output. + abort_on_panic(|| raw.output.drop_in_place()); + } + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + Self::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + Err(s) => state = s, + } + } + } + Poll::Pending => { + let mut future_dropped = false; + + // The task is still not completed. + loop { + // If the task was closed while running, we'll need to unschedule in case it + // was woken up and then destroy it. + let new = if state & CLOSED != 0 { + state & !RUNNING & !SCHEDULED + } else { + state & !RUNNING + }; + + if state & CLOSED != 0 && !future_dropped { + // The thread that closed the task didn't drop the future because it was + // running so now it's our responsibility to do so. + Self::drop_future(ptr); + future_dropped = true; + } + + // Mark the task as not running. + match (*raw.header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // If the task was closed while running, we need to notify the awaiter. + // If the task was woken up while running, we need to schedule it. + // Otherwise, we just drop the task reference. + if state & CLOSED != 0 { + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + Self::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + } else if state & SCHEDULED != 0 { + // The thread that woke the task up didn't reschedule it because + // it was running so now it's our responsibility to do so. + Self::schedule(ptr); + return true; + } else { + // Drop the task reference. + Self::drop_ref(ptr); + } + break; + } + Err(s) => state = s, + } + } + } + } + + return false; + + /// A guard that closes the task if polling its future panics. + struct Guard<F, T, S>(RawTask<F, T, S>) + where + F: Future<Output = T>, + S: Fn(Runnable); + + impl<F, T, S> Drop for Guard<F, T, S> + where + F: Future<Output = T>, + S: Fn(Runnable), + { + fn drop(&mut self) { + let raw = self.0; + let ptr = raw.header as *const (); + + unsafe { + let mut state = (*raw.header).state.load(Ordering::Acquire); + + loop { + // If the task was closed while running, then unschedule it, drop its + // future, and drop the task reference. + if state & CLOSED != 0 { + // The thread that closed the task didn't drop the future because it + // was running so now it's our responsibility to do so. + RawTask::<F, T, S>::drop_future(ptr); + + // Mark the task as not running and not scheduled. + (*raw.header) + .state + .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + RawTask::<F, T, S>::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + + // Mark the task as not running, not scheduled, and closed. + match (*raw.header).state.compare_exchange_weak( + state, + (state & !RUNNING & !SCHEDULED) | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(state) => { + // Drop the future because the task is now closed. + RawTask::<F, T, S>::drop_future(ptr); + + // Take the awaiter out. + let mut awaiter = None; + if state & AWAITER != 0 { + awaiter = (*raw.header).take(None); + } + + // Drop the task reference. + RawTask::<F, T, S>::drop_ref(ptr); + + // Notify the awaiter that the future has been dropped. + if let Some(w) = awaiter { + abort_on_panic(|| w.wake()); + } + break; + } + Err(s) => state = s, + } + } + } + } + } + } +} diff --git a/src/runnable.rs b/src/runnable.rs new file mode 100644 index 0000000..cb70ef3 --- /dev/null +++ b/src/runnable.rs @@ -0,0 +1,398 @@ +use core::fmt; +use core::future::Future; +use core::marker::PhantomData; +use core::mem; +use core::ptr::NonNull; +use core::sync::atomic::Ordering; +use core::task::Waker; + +use crate::header::Header; +use crate::raw::RawTask; +use crate::state::*; +use crate::Task; + +/// Creates a new task. +/// +/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its +/// output. +/// +/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] +/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run +/// again. +/// +/// When the task is woken, its [`Runnable`] is passed to the `schedule` function. +/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it +/// should push it into a task queue so that it can be processed later. +/// +/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider +/// using [`spawn_local()`] or [`spawn_unchecked()`] instead. +/// +/// # Examples +/// +/// ``` +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // A function that schedules the task when it gets woken up. +/// let (s, r) = flume::unbounded(); +/// let schedule = move |runnable| s.send(runnable).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = async_task::spawn(future, schedule); +/// ``` +pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) +where + F: Future + Send + 'static, + F::Output: Send + 'static, + S: Fn(Runnable) + Send + Sync + 'static, +{ + unsafe { spawn_unchecked(future, schedule) } +} + +/// Creates a new thread-local task. +/// +/// This function is same as [`spawn()`], except it does not require [`Send`] on `future`. If the +/// [`Runnable`] is used or dropped on another thread, a panic will occur. +/// +/// This function is only available when the `std` feature for this crate is enabled. +/// +/// # Examples +/// +/// ``` +/// use async_task::Runnable; +/// use flume::{Receiver, Sender}; +/// use std::rc::Rc; +/// +/// thread_local! { +/// // A queue that holds scheduled tasks. +/// static QUEUE: (Sender<Runnable>, Receiver<Runnable>) = flume::unbounded(); +/// } +/// +/// // Make a non-Send future. +/// let msg: Rc<str> = "Hello, world!".into(); +/// let future = async move { +/// println!("{}", msg); +/// }; +/// +/// // A function that schedules the task when it gets woken up. +/// let s = QUEUE.with(|(s, _)| s.clone()); +/// let schedule = move |runnable| s.send(runnable).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = async_task::spawn_local(future, schedule); +/// ``` +#[cfg(feature = "std")] +pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) +where + F: Future + 'static, + F::Output: 'static, + S: Fn(Runnable) + Send + Sync + 'static, +{ + use std::mem::ManuallyDrop; + use std::pin::Pin; + use std::task::{Context, Poll}; + use std::thread::{self, ThreadId}; + + #[inline] + fn thread_id() -> ThreadId { + thread_local! { + static ID: ThreadId = thread::current().id(); + } + ID.try_with(|id| *id) + .unwrap_or_else(|_| thread::current().id()) + } + + struct Checked<F> { + id: ThreadId, + inner: ManuallyDrop<F>, + } + + impl<F> Drop for Checked<F> { + fn drop(&mut self) { + assert!( + self.id == thread_id(), + "local task dropped by a thread that didn't spawn it" + ); + unsafe { + ManuallyDrop::drop(&mut self.inner); + } + } + } + + impl<F: Future> Future for Checked<F> { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + assert!( + self.id == thread_id(), + "local task polled by a thread that didn't spawn it" + ); + unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) } + } + } + + // Wrap the future into one that checks which thread it's on. + let future = Checked { + id: thread_id(), + inner: ManuallyDrop::new(future), + }; + + unsafe { spawn_unchecked(future, schedule) } +} + +/// Creates a new task without [`Send`], [`Sync`], and `'static` bounds. +/// +/// This function is same as [`spawn()`], except it does not require [`Send`], [`Sync`], and +/// `'static` on `future` and `schedule`. +/// +/// # Safety +/// +/// - If `future` is not [`Send`], its [`Runnable`] must be used and dropped on the original +/// thread. +/// - If `future` is not `'static`, borrowed variables must outlive its [`Runnable`]. +/// - If `schedule` is not [`Send`] and [`Sync`], the task's [`Waker`] must be used and dropped on +/// the original thread. +/// - If `schedule` is not `'static`, borrowed variables must outlive the task's [`Waker`]. +/// +/// # Examples +/// +/// ``` +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken up, it will be sent into this channel. +/// let (s, r) = flume::unbounded(); +/// let schedule = move |runnable| s.send(runnable).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; +/// ``` +pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>) +where + F: Future, + S: Fn(Runnable), +{ + // Allocate large futures on the heap. + let ptr = if mem::size_of::<F>() >= 2048 { + let future = alloc::boxed::Box::pin(future); + RawTask::<_, F::Output, S>::allocate(future, schedule) + } else { + RawTask::<F, F::Output, S>::allocate(future, schedule) + }; + + let runnable = Runnable { ptr }; + let task = Task { + ptr, + _marker: PhantomData, + }; + (runnable, task) +} + +/// A handle to a runnable task. +/// +/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is +/// scheduled for running. +/// +/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] +/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run +/// again. +/// +/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and +/// awaiting the [`Task`] after that will result in a panic. +/// +/// # Examples +/// +/// ``` +/// use async_task::Runnable; +/// use once_cell::sync::Lazy; +/// use std::{panic, thread}; +/// +/// // A simple executor. +/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| { +/// let (sender, receiver) = flume::unbounded::<Runnable>(); +/// thread::spawn(|| { +/// for runnable in receiver { +/// let _ignore_panic = panic::catch_unwind(|| runnable.run()); +/// } +/// }); +/// sender +/// }); +/// +/// // Create a task with a simple future. +/// let schedule = |runnable| QUEUE.send(runnable).unwrap(); +/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); +/// +/// // Schedule the task and await its output. +/// runnable.schedule(); +/// assert_eq!(smol::future::block_on(task), 3); +/// ``` +pub struct Runnable { + /// A pointer to the heap-allocated task. + pub(crate) ptr: NonNull<()>, +} + +unsafe impl Send for Runnable {} +unsafe impl Sync for Runnable {} + +#[cfg(feature = "std")] +impl std::panic::UnwindSafe for Runnable {} +#[cfg(feature = "std")] +impl std::panic::RefUnwindSafe for Runnable {} + +impl Runnable { + /// Schedules the task. + /// + /// This is a convenience method that passes the [`Runnable`] to the schedule function. + /// + /// # Examples + /// + /// ``` + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with a simple future and the schedule function. + /// let (runnable, task) = async_task::spawn(async {}, schedule); + /// + /// // Schedule the task. + /// assert_eq!(r.len(), 0); + /// runnable.schedule(); + /// assert_eq!(r.len(), 1); + /// ``` + pub fn schedule(self) { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + mem::forget(self); + + unsafe { + ((*header).vtable.schedule)(ptr); + } + } + + /// Runs the task by polling its future. + /// + /// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets + /// rescheduled at the end of this method invocation. Otherwise, returns `false` and the + /// [`Runnable`] vanishes until the task is woken. + /// The return value is just a hint: `true` usually indicates that the task has yielded, i.e. + /// it woke itself and then gave the control back to the executor. + /// + /// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then + /// this method simply destroys the task. + /// + /// If the polled future panics, this method propagates the panic, and awaiting the [`Task`] + /// after that will also result in a panic. + /// + /// # Examples + /// + /// ``` + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with a simple future and the schedule function. + /// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule); + /// + /// // Run the task and check its output. + /// runnable.run(); + /// assert_eq!(smol::future::block_on(task), 3); + /// ``` + pub fn run(self) -> bool { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + mem::forget(self); + + unsafe { ((*header).vtable.run)(ptr) } + } + + /// Returns a waker associated with this task. + /// + /// # Examples + /// + /// ``` + /// use smol::future; + /// + /// // A function that schedules the task when it gets woken up. + /// let (s, r) = flume::unbounded(); + /// let schedule = move |runnable| s.send(runnable).unwrap(); + /// + /// // Create a task with a simple future and the schedule function. + /// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule); + /// + /// // Take a waker and run the task. + /// let waker = runnable.waker(); + /// runnable.run(); + /// + /// // Reschedule the task by waking it. + /// assert_eq!(r.len(), 0); + /// waker.wake(); + /// assert_eq!(r.len(), 1); + /// ``` + pub fn waker(&self) -> Waker { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + unsafe { + let raw_waker = ((*header).vtable.clone_waker)(ptr); + Waker::from_raw(raw_waker) + } + } +} + +impl Drop for Runnable { + fn drop(&mut self) { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task has been completed or closed, it can't be canceled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // Mark the task as closed. + match (*header).state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } + + // Drop the future. + ((*header).vtable.drop_future)(ptr); + + // Mark the task as unscheduled. + let state = (*header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); + + // Notify the awaiter that the future has been dropped. + if state & AWAITER != 0 { + (*header).notify(None); + } + + // Drop the task reference. + ((*header).vtable.drop_ref)(ptr); + } + } +} + +impl fmt::Debug for Runnable { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + f.debug_struct("Runnable") + .field("header", unsafe { &(*header) }) + .finish() + } +} diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..2fc6cf3 --- /dev/null +++ b/src/state.rs @@ -0,0 +1,69 @@ +/// Set if the task is scheduled for running. +/// +/// A task is considered to be scheduled whenever its `Runnable` exists. +/// +/// This flag can't be set when the task is completed. However, it can be set while the task is +/// running, in which case it will be rescheduled as soon as polling finishes. +pub(crate) const SCHEDULED: usize = 1 << 0; + +/// Set if the task is running. +/// +/// A task is in running state while its future is being polled. +/// +/// This flag can't be set when the task is completed. However, it can be in scheduled state while +/// it is running, in which case it will be rescheduled as soon as polling finishes. +pub(crate) const RUNNING: usize = 1 << 1; + +/// Set if the task has been completed. +/// +/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored +/// inside the task until it becomes closed. In fact, `Task` picks up the output by marking +/// the task as closed. +/// +/// This flag can't be set when the task is scheduled or running. +pub(crate) const COMPLETED: usize = 1 << 2; + +/// Set if the task is closed. +/// +/// If a task is closed, that means it's either canceled or its output has been consumed by the +/// `Task`. A task becomes closed in the following cases: +/// +/// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. +/// 2. Its output gets awaited by the `Task`. +/// 3. It panics while polling the future. +/// 4. It is completed and the `Task` gets dropped. +pub(crate) const CLOSED: usize = 1 << 3; + +/// Set if the `Task` still exists. +/// +/// The `Task` is a special case in that it is only tracked by this flag, while all other +/// task references (`Runnable` and `Waker`s) are tracked by the reference count. +pub(crate) const TASK: usize = 1 << 4; + +/// Set if the `Task` is awaiting the output. +/// +/// This flag is set while there is a registered awaiter of type `Waker` inside the task. When the +/// task gets closed or completed, we need to wake the awaiter. This flag can be used as a fast +/// check that tells us if we need to wake anyone. +pub(crate) const AWAITER: usize = 1 << 5; + +/// Set if an awaiter is being registered. +/// +/// This flag is set when `Task` is polled and we are registering a new awaiter. +pub(crate) const REGISTERING: usize = 1 << 6; + +/// Set if the awaiter is being notified. +/// +/// This flag is set when notifying the awaiter. If an awaiter is concurrently registered and +/// notified, whichever side came first will take over the reposibility of resolving the race. +pub(crate) const NOTIFYING: usize = 1 << 7; + +/// A single reference. +/// +/// The lower bits in the state contain various flags representing the task state, while the upper +/// bits contain the reference count. The value of `REFERENCE` represents a single reference in the +/// total reference count. +/// +/// Note that the reference counter only tracks the `Runnable` and `Waker`s. The `Task` is +/// tracked separately by the `TASK` flag. +pub(crate) const REFERENCE: usize = 1 << 8; diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..8ecd746 --- /dev/null +++ b/src/task.rs @@ -0,0 +1,532 @@ +use core::fmt; +use core::future::Future; +use core::marker::{PhantomData, Unpin}; +use core::mem; +use core::pin::Pin; +use core::ptr::NonNull; +use core::sync::atomic::Ordering; +use core::task::{Context, Poll}; + +use crate::header::Header; +use crate::state::*; + +/// A spawned task. +/// +/// A [`Task`] can be awaited to retrieve the output of its future. +/// +/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the +/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a +/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()] +/// method. +/// +/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor +/// can destroy the task by simply dropping its [`Runnable`][`super::Runnable`] or by invoking +/// [`run()`][`super::Runnable::run()`]. +/// +/// # Examples +/// +/// ``` +/// use smol::{future, Executor}; +/// use std::thread; +/// +/// let ex = Executor::new(); +/// +/// // Spawn a future onto the executor. +/// let task = ex.spawn(async { +/// println!("Hello from a task!"); +/// 1 + 2 +/// }); +/// +/// // Run an executor thread. +/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); +/// +/// // Wait for the task's output. +/// assert_eq!(future::block_on(task), 3); +/// ``` +#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] +pub struct Task<T> { + /// A raw task pointer. + pub(crate) ptr: NonNull<()>, + + /// A marker capturing generic type `T`. + pub(crate) _marker: PhantomData<T>, +} + +unsafe impl<T: Send> Send for Task<T> {} +unsafe impl<T> Sync for Task<T> {} + +impl<T> Unpin for Task<T> {} + +#[cfg(feature = "std")] +impl<T> std::panic::UnwindSafe for Task<T> {} +#[cfg(feature = "std")] +impl<T> std::panic::RefUnwindSafe for Task<T> {} + +impl<T> Task<T> { + /// Detaches the task to let it keep running in the background. + /// + /// # Examples + /// + /// ``` + /// use smol::{Executor, Timer}; + /// use std::time::Duration; + /// + /// let ex = Executor::new(); + /// + /// // Spawn a deamon future. + /// ex.spawn(async { + /// loop { + /// println!("I'm a daemon task looping forever."); + /// Timer::after(Duration::from_secs(1)).await; + /// } + /// }) + /// .detach(); + /// ``` + pub fn detach(self) { + let mut this = self; + let _out = this.set_detached(); + mem::forget(this); + } + + /// Cancels the task and waits for it to stop running. + /// + /// Returns the task's output if it was completed just before it got canceled, or [`None`] if + /// it didn't complete. + /// + /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of + /// canceling because it also waits for the task to stop running. + /// + /// # Examples + /// + /// ``` + /// # if cfg!(miri) { return; } // Miri does not support epoll + /// use smol::{future, Executor, Timer}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let ex = Executor::new(); + /// + /// // Spawn a deamon future. + /// let task = ex.spawn(async { + /// loop { + /// println!("Even though I'm in an infinite loop, you can still cancel me!"); + /// Timer::after(Duration::from_secs(1)).await; + /// } + /// }); + /// + /// // Run an executor thread. + /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); + /// + /// future::block_on(async { + /// Timer::after(Duration::from_secs(3)).await; + /// task.cancel().await; + /// }); + /// ``` + pub async fn cancel(self) -> Option<T> { + let mut this = self; + this.set_canceled(); + this.fallible().await + } + + /// Converts this task into a [`FallibleTask`]. + /// + /// Like [`Task`], a fallible task will poll the task's output until it is + /// completed or cancelled due to its [`Runnable`][`super::Runnable`] being + /// dropped without being run. Resolves to the task's output when completed, + /// or [`None`] if it didn't complete. + /// + /// # Examples + /// + /// ``` + /// use smol::{future, Executor}; + /// use std::thread; + /// + /// let ex = Executor::new(); + /// + /// // Spawn a future onto the executor. + /// let task = ex.spawn(async { + /// println!("Hello from a task!"); + /// 1 + 2 + /// }) + /// .fallible(); + /// + /// // Run an executor thread. + /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); + /// + /// // Wait for the task's output. + /// assert_eq!(future::block_on(task), Some(3)); + /// ``` + /// + /// ``` + /// use smol::future; + /// + /// // Schedule function which drops the runnable without running it. + /// let schedule = move |runnable| drop(runnable); + /// + /// // Create a task with the future and the schedule function. + /// let (runnable, task) = async_task::spawn(async { + /// println!("Hello from a task!"); + /// 1 + 2 + /// }, schedule); + /// runnable.schedule(); + /// + /// // Wait for the task's output. + /// assert_eq!(future::block_on(task.fallible()), None); + /// ``` + pub fn fallible(self) -> FallibleTask<T> { + FallibleTask { task: self } + } + + /// Puts the task in canceled state. + fn set_canceled(&mut self) { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task has been completed or closed, it can't be canceled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // If the task is not scheduled nor running, we'll need to schedule it. + let new = if state & (SCHEDULED | RUNNING) == 0 { + (state | SCHEDULED | CLOSED) + REFERENCE + } else { + state | CLOSED + }; + + // Mark the task as closed. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If the task is not scheduled nor running, schedule it one more time so + // that its future gets dropped by the executor. + if state & (SCHEDULED | RUNNING) == 0 { + ((*header).vtable.schedule)(ptr); + } + + // Notify the awaiter that the task has been closed. + if state & AWAITER != 0 { + (*header).notify(None); + } + + break; + } + Err(s) => state = s, + } + } + } + } + + /// Puts the task in detached state. + fn set_detached(&mut self) -> Option<T> { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + unsafe { + // A place where the output will be stored in case it needs to be dropped. + let mut output = None; + + // Optimistically assume the `Task` is being detached just after creating the task. + // This is a common case so if the `Task` is datached, the overhead of it is only one + // compare-exchange operation. + if let Err(mut state) = (*header).state.compare_exchange_weak( + SCHEDULED | TASK | REFERENCE, + SCHEDULED | REFERENCE, + Ordering::AcqRel, + Ordering::Acquire, + ) { + loop { + // If the task has been completed but not yet closed, that means its output + // must be dropped. + if state & COMPLETED != 0 && state & CLOSED == 0 { + // Mark the task as closed in order to grab its output. + match (*header).state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Read the output. + output = + Some((((*header).vtable.get_output)(ptr) as *mut T).read()); + + // Update the state variable because we're continuing the loop. + state |= CLOSED; + } + Err(s) => state = s, + } + } else { + // If this is the last reference to the task and it's not closed, then + // close it and schedule one more time so that its future gets dropped by + // the executor. + let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { + SCHEDULED | CLOSED | REFERENCE + } else { + state & !TASK + }; + + // Unset the `TASK` flag. + match (*header).state.compare_exchange_weak( + state, + new, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // If this is the last reference to the task, we need to either + // schedule dropping its future or destroy it. + if state & !(REFERENCE - 1) == 0 { + if state & CLOSED == 0 { + ((*header).vtable.schedule)(ptr); + } else { + ((*header).vtable.destroy)(ptr); + } + } + + break; + } + Err(s) => state = s, + } + } + } + } + + output + } + } + + /// Polls the task to retrieve its output. + /// + /// Returns `Some` if the task has completed or `None` if it was closed. + /// + /// A task becomes closed in the following cases: + /// + /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. + /// 2. Its output gets awaited by the `Task`. + /// 3. It panics while polling the future. + /// 4. It is completed and the `Task` gets dropped. + fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + unsafe { + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task has been closed, notify the awaiter and return `None`. + if state & CLOSED != 0 { + // If the task is scheduled or running, we need to wait until its future is + // dropped. + if state & (SCHEDULED | RUNNING) != 0 { + // Replace the waker with one associated with the current task. + (*header).register(cx.waker()); + + // Reload the state after registering. It is possible changes occurred just + // before registration so we need to check for that. + state = (*header).state.load(Ordering::Acquire); + + // If the task is still scheduled or running, we need to wait because its + // future is not dropped yet. + if state & (SCHEDULED | RUNNING) != 0 { + return Poll::Pending; + } + } + + // Even though the awaiter is most likely the current task, it could also be + // another task. + (*header).notify(Some(cx.waker())); + return Poll::Ready(None); + } + + // If the task is not completed, register the current task. + if state & COMPLETED == 0 { + // Replace the waker with one associated with the current task. + (*header).register(cx.waker()); + + // Reload the state after registering. It is possible that the task became + // completed or closed just before registration so we need to check for that. + state = (*header).state.load(Ordering::Acquire); + + // If the task has been closed, restart. + if state & CLOSED != 0 { + continue; + } + + // If the task is still not completed, we're blocked on it. + if state & COMPLETED == 0 { + return Poll::Pending; + } + } + + // Since the task is now completed, mark it as closed in order to grab its output. + match (*header).state.compare_exchange( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Notify the awaiter. Even though the awaiter is most likely the current + // task, it could also be another task. + if state & AWAITER != 0 { + (*header).notify(Some(cx.waker())); + } + + // Take the output from the task. + let output = ((*header).vtable.get_output)(ptr) as *mut T; + return Poll::Ready(Some(output.read())); + } + Err(s) => state = s, + } + } + } + } + + fn header(&self) -> &Header { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + unsafe { &*header } + } + + /// Returns `true` if the current task is finished. + /// + /// Note that in a multithreaded environment, this task can change finish immediately after calling this function. + pub fn is_finished(&self) -> bool { + let ptr = self.ptr.as_ptr(); + let header = ptr as *const Header; + + unsafe { + let state = (*header).state.load(Ordering::Acquire); + state & (CLOSED | COMPLETED) != 0 + } + } +} + +impl<T> Drop for Task<T> { + fn drop(&mut self) { + self.set_canceled(); + self.set_detached(); + } +} + +impl<T> Future for Task<T> { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + match self.poll_task(cx) { + Poll::Ready(t) => Poll::Ready(t.expect("task has failed")), + Poll::Pending => Poll::Pending, + } + } +} + +impl<T> fmt::Debug for Task<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Task") + .field("header", self.header()) + .finish() + } +} + +/// A spawned task with a fallible response. +/// +/// This type behaves like [`Task`], however it produces an `Option<T>` when +/// polled and will return `None` if the executor dropped its +/// [`Runnable`][`super::Runnable`] without being run. +/// +/// This can be useful to avoid the panic produced when polling the `Task` +/// future if the executor dropped its `Runnable`. +#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] +pub struct FallibleTask<T> { + task: Task<T>, +} + +impl<T> FallibleTask<T> { + /// Detaches the task to let it keep running in the background. + /// + /// # Examples + /// + /// ``` + /// use smol::{Executor, Timer}; + /// use std::time::Duration; + /// + /// let ex = Executor::new(); + /// + /// // Spawn a deamon future. + /// ex.spawn(async { + /// loop { + /// println!("I'm a daemon task looping forever."); + /// Timer::after(Duration::from_secs(1)).await; + /// } + /// }) + /// .fallible() + /// .detach(); + /// ``` + pub fn detach(self) { + self.task.detach() + } + + /// Cancels the task and waits for it to stop running. + /// + /// Returns the task's output if it was completed just before it got canceled, or [`None`] if + /// it didn't complete. + /// + /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of + /// canceling because it also waits for the task to stop running. + /// + /// # Examples + /// + /// ``` + /// # if cfg!(miri) { return; } // Miri does not support epoll + /// use smol::{future, Executor, Timer}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let ex = Executor::new(); + /// + /// // Spawn a deamon future. + /// let task = ex.spawn(async { + /// loop { + /// println!("Even though I'm in an infinite loop, you can still cancel me!"); + /// Timer::after(Duration::from_secs(1)).await; + /// } + /// }) + /// .fallible(); + /// + /// // Run an executor thread. + /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); + /// + /// future::block_on(async { + /// Timer::after(Duration::from_secs(3)).await; + /// task.cancel().await; + /// }); + /// ``` + pub async fn cancel(self) -> Option<T> { + self.task.cancel().await + } +} + +impl<T> Future for FallibleTask<T> { + type Output = Option<T>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + self.task.poll_task(cx) + } +} + +impl<T> fmt::Debug for FallibleTask<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FallibleTask") + .field("header", self.task.header()) + .finish() + } +} diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..189e9af --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,127 @@ +use core::alloc::Layout as StdLayout; +use core::mem; + +/// Aborts the process. +/// +/// To abort, this function simply panics while panicking. +pub(crate) fn abort() -> ! { + struct Panic; + + impl Drop for Panic { + fn drop(&mut self) { + panic!("aborting the process"); + } + } + + let _panic = Panic; + panic!("aborting the process"); +} + +/// Calls a function and aborts if it panics. +/// +/// This is useful in unsafe code where we can't recover from panics. +#[inline] +pub(crate) fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T { + struct Bomb; + + impl Drop for Bomb { + fn drop(&mut self) { + abort(); + } + } + + let bomb = Bomb; + let t = f(); + mem::forget(bomb); + t +} + +/// A version of `alloc::alloc::Layout` that can be used in the const +/// position. +#[derive(Clone, Copy, Debug)] +pub(crate) struct Layout { + size: usize, + align: usize, +} + +impl Layout { + /// Creates a new `Layout` with the given size and alignment. + #[inline] + pub(crate) const fn from_size_align(size: usize, align: usize) -> Self { + Self { size, align } + } + + /// Creates a new `Layout` for the given sized type. + #[inline] + pub(crate) const fn new<T>() -> Self { + Self::from_size_align(mem::size_of::<T>(), mem::align_of::<T>()) + } + + /// Convert this into the standard library's layout type. + /// + /// # Safety + /// + /// - `align` must be non-zero and a power of two. + /// - When rounded up to the nearest multiple of `align`, the size + /// must not overflow. + #[inline] + pub(crate) const unsafe fn into_std(self) -> StdLayout { + StdLayout::from_size_align_unchecked(self.size, self.align) + } + + /// Get the alignment of this layout. + #[inline] + pub(crate) const fn align(&self) -> usize { + self.align + } + + /// Get the size of this layout. + #[inline] + pub(crate) const fn size(&self) -> usize { + self.size + } + + /// Returns the layout for `a` followed by `b` and the offset of `b`. + /// + /// This function was adapted from the currently unstable `Layout::extend()`: + /// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.extend + #[inline] + pub(crate) const fn extend(self, other: Layout) -> Option<(Layout, usize)> { + let new_align = max(self.align(), other.align()); + let pad = self.padding_needed_for(other.align()); + + let offset = leap!(self.size().checked_add(pad)); + let new_size = leap!(offset.checked_add(other.size())); + + // return None if any of the following are true: + // - align is 0 (implied false by is_power_of_two()) + // - align is not a power of 2 + // - size rounded up to align overflows + if !new_align.is_power_of_two() || new_size > core::usize::MAX - (new_align - 1) { + return None; + } + + let layout = Layout::from_size_align(new_size, new_align); + Some((layout, offset)) + } + + /// Returns the padding after `layout` that aligns the following address to `align`. + /// + /// This function was adapted from the currently unstable `Layout::padding_needed_for()`: + /// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.padding_needed_for + #[inline] + pub(crate) const fn padding_needed_for(self, align: usize) -> usize { + let len = self.size(); + let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1); + len_rounded_up.wrapping_sub(len) + } +} + +#[inline] +pub(crate) const fn max(left: usize, right: usize) -> usize { + if left > right { + left + } else { + right + } +} diff --git a/tests/basic.rs b/tests/basic.rs new file mode 100644 index 0000000..c223043 --- /dev/null +++ b/tests/basic.rs @@ -0,0 +1,299 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; + +use async_task::Runnable; +use smol::future; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP)` +// +// The future `f` always returns `Poll::Ready`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = Box<i32>; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + $poll.fetch_add(1, Ordering::SeqCst); + Poll::Ready(Box::new(0)) + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + static $drop: AtomicUsize = AtomicUsize::new(0); + static $sched: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + let guard = Guard(Box::new(0)); + move |_runnable| { + let _ = &guard; + $sched.fetch_add(1, Ordering::SeqCst); + } + }; + }; +} + +fn try_await<T>(f: impl Future<Output = T>) -> Option<T> { + future::block_on(future::poll_once(f)) +} + +#[test] +fn drop_and_detach() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + drop(runnable); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn detach_and_drop() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + drop(runnable); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn detach_and_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn run_and_detach() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn cancel_and_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn run_and_cancel() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn cancel_join() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, mut task) = async_task::spawn(f, s); + + assert!(try_await(&mut task).is_none()); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + assert!(try_await(&mut task).is_some()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn schedule() { + let (s, r) = flume::unbounded(); + let schedule = move |runnable| s.send(runnable).unwrap(); + let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule); + + assert!(r.is_empty()); + runnable.schedule(); + + let runnable = r.recv().unwrap(); + assert!(r.is_empty()); + runnable.schedule(); + + let runnable = r.recv().unwrap(); + assert!(r.is_empty()); + runnable.schedule(); + + r.recv().unwrap(); +} + +#[test] +fn schedule_counter() { + static COUNT: AtomicUsize = AtomicUsize::new(0); + + let (s, r) = flume::unbounded(); + let schedule = move |runnable: Runnable| { + COUNT.fetch_add(1, Ordering::SeqCst); + s.send(runnable).unwrap(); + }; + let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule); + runnable.schedule(); + + r.recv().unwrap().schedule(); + r.recv().unwrap().schedule(); + assert_eq!(COUNT.load(Ordering::SeqCst), 3); + r.recv().unwrap(); +} + +#[test] +fn drop_inside_schedule() { + struct DropGuard(AtomicUsize); + impl Drop for DropGuard { + fn drop(&mut self) { + self.0.fetch_add(1, Ordering::SeqCst); + } + } + let guard = DropGuard(AtomicUsize::new(0)); + + let (runnable, _) = async_task::spawn(async {}, move |runnable| { + assert_eq!(guard.0.load(Ordering::SeqCst), 0); + drop(runnable); + assert_eq!(guard.0.load(Ordering::SeqCst), 0); + }); + runnable.schedule(); +} + +#[test] +fn waker() { + let (s, r) = flume::unbounded(); + let schedule = move |runnable| s.send(runnable).unwrap(); + let (runnable, _task) = async_task::spawn(future::poll_fn(|_| Poll::<()>::Pending), schedule); + + assert!(r.is_empty()); + let waker = runnable.waker(); + runnable.run(); + waker.wake_by_ref(); + + let runnable = r.recv().unwrap(); + runnable.run(); + waker.wake(); + r.recv().unwrap(); +} diff --git a/tests/cancel.rs b/tests/cancel.rs new file mode 100644 index 0000000..0fe7c72 --- /dev/null +++ b/tests/cancel.rs @@ -0,0 +1,183 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Runnable; +use easy_parallel::Parallel; +use smol::future; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP_F, DROP_T)` +// +// The future `f` outputs `Poll::Ready`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP_F` is incremented. +// When the output gets dropped, `DROP_T` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop_f:ident, $drop_t:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop_f: AtomicUsize = AtomicUsize::new(0); + static $drop_t: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = Out; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + $poll.fetch_add(1, Ordering::SeqCst); + thread::sleep(ms(400)); + Poll::Ready(Out(Box::new(0), true)) + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop_f.fetch_add(1, Ordering::SeqCst); + } + } + + #[derive(Default)] + struct Out(Box<i32>, bool); + + impl Drop for Out { + fn drop(&mut self) { + if self.1 { + $drop_t.fetch_add(1, Ordering::SeqCst); + } + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + static $drop: AtomicUsize = AtomicUsize::new(0); + static $sched: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + let guard = Guard(Box::new(0)); + move |runnable: Runnable| { + let _ = &guard; + runnable.schedule(); + $sched.fetch_add(1, Ordering::SeqCst); + } + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn run_and_cancel() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + assert!(future::block_on(task.cancel()).is_some()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn cancel_and_run() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + thread::sleep(ms(200)); + runnable.run(); + + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + thread::sleep(ms(200)); + + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + assert!(future::block_on(task.cancel()).is_none()); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + thread::sleep(ms(200)); + + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .run(); +} + +#[test] +fn cancel_during_run() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + runnable.run(); + + thread::sleep(ms(200)); + + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + assert!(future::block_on(task.cancel()).is_none()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .run(); +} diff --git a/tests/join.rs b/tests/join.rs new file mode 100644 index 0000000..2ac7b8d --- /dev/null +++ b/tests/join.rs @@ -0,0 +1,386 @@ +use std::cell::Cell; +use std::future::Future; +use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Runnable; +use easy_parallel::Parallel; +use smol::future; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP_F, DROP_T)` +// +// The future `f` outputs `Poll::Ready`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP_F` is incremented. +// When the output gets dropped, `DROP_T` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop_f:ident, $drop_t:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop_f: AtomicUsize = AtomicUsize::new(0); + static $drop_t: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = Out; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + $poll.fetch_add(1, Ordering::SeqCst); + Poll::Ready(Out(Box::new(0), true)) + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop_f.fetch_add(1, Ordering::SeqCst); + } + } + + #[derive(Default)] + struct Out(Box<i32>, bool); + + impl Drop for Out { + fn drop(&mut self) { + if self.1 { + $drop_t.fetch_add(1, Ordering::SeqCst); + } + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + static $drop: AtomicUsize = AtomicUsize::new(0); + static $sched: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + let guard = Guard(Box::new(0)); + move |runnable: Runnable| { + let _ = &guard; + runnable.schedule(); + $sched.fetch_add(1, Ordering::SeqCst); + } + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn drop_and_join() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + drop(runnable); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + assert!(catch_unwind(|| future::block_on(task)).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); +} + +#[test] +fn run_and_join() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + runnable.run(); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + assert!(catch_unwind(|| future::block_on(task)).is_ok()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); +} + +#[test] +fn detach_and_run() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + task.detach(); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); +} + +#[test] +fn join_twice() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, mut task) = async_task::spawn(f, s); + + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + runnable.run(); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + future::block_on(&mut task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + + assert!(catch_unwind(AssertUnwindSafe(|| future::block_on(&mut task))).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + + task.detach(); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn join_and_cancel() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + thread::sleep(ms(200)); + drop(runnable); + + thread::sleep(ms(400)); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + assert!(catch_unwind(|| future::block_on(task)).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + + thread::sleep(ms(200)); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .run(); +} + +#[test] +fn join_and_run() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + thread::sleep(ms(400)); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + + thread::sleep(ms(200)); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + future::block_on(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + + thread::sleep(ms(200)); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .run(); +} + +#[test] +fn try_join_and_run_and_join() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, mut task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + thread::sleep(ms(400)); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + + thread::sleep(ms(200)); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + future::block_on(future::or(&mut task, future::ready(Default::default()))); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + future::block_on(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + + thread::sleep(ms(200)); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .run(); +} + +#[test] +fn try_join_and_cancel_and_run() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, mut task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + thread::sleep(ms(200)); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + future::block_on(future::or(&mut task, future::ready(Default::default()))); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + }) + .run(); +} + +#[test] +fn try_join_and_run_and_cancel() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, mut task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + thread::sleep(ms(200)); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + }) + .add(|| { + future::block_on(future::or(&mut task, future::ready(Default::default()))); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + thread::sleep(ms(400)); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + }) + .run(); +} + +#[test] +fn await_output() { + struct Fut<T>(Cell<Option<T>>); + + impl<T> Fut<T> { + fn new(t: T) -> Fut<T> { + Fut(Cell::new(Some(t))) + } + } + + impl<T> Future for Fut<T> { + type Output = T; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + Poll::Ready(self.0.take().unwrap()) + } + } + + for i in 0..10 { + let (runnable, task) = async_task::spawn(Fut::new(i), drop); + runnable.run(); + assert_eq!(future::block_on(task), i); + } + + for i in 0..10 { + let (runnable, task) = async_task::spawn(Fut::new(vec![7; i]), drop); + runnable.run(); + assert_eq!(future::block_on(task), vec![7; i]); + } + + let (runnable, task) = async_task::spawn(Fut::new("foo".to_string()), drop); + runnable.run(); + assert_eq!(future::block_on(task), "foo"); +} diff --git a/tests/panic.rs b/tests/panic.rs new file mode 100644 index 0000000..09ffb28 --- /dev/null +++ b/tests/panic.rs @@ -0,0 +1,234 @@ +use std::future::Future; +use std::panic::catch_unwind; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Runnable; +use easy_parallel::Parallel; +use smol::future; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP)` +// +// The future `f` sleeps for 200 ms and then panics. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + $poll.fetch_add(1, Ordering::SeqCst); + thread::sleep(ms(400)); + panic!() + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + static $drop: AtomicUsize = AtomicUsize::new(0); + static $sched: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + let guard = Guard(Box::new(0)); + move |_runnable: Runnable| { + let _ = &guard; + $sched.fetch_add(1, Ordering::SeqCst); + } + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn cancel_during_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + assert!(catch_unwind(|| runnable.run()).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + }) + .run(); +} + +#[test] +fn run_and_join() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + assert!(catch_unwind(|| runnable.run()).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + assert!(catch_unwind(|| future::block_on(task)).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn try_join_and_run_and_join() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, mut task) = async_task::spawn(f, s); + + future::block_on(future::or(&mut task, future::ready(Default::default()))); + assert_eq!(POLL.load(Ordering::SeqCst), 0); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + assert!(catch_unwind(|| runnable.run()).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + assert!(catch_unwind(|| future::block_on(task)).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn join_during_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + assert!(catch_unwind(|| runnable.run()).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + + thread::sleep(ms(200)); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + assert!(catch_unwind(|| future::block_on(task)).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + + thread::sleep(ms(200)); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .run(); +} + +#[test] +fn try_join_during_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, mut task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + assert!(catch_unwind(|| runnable.run()).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + future::block_on(future::or(&mut task, future::ready(Default::default()))); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + drop(task); + }) + .run(); +} + +#[test] +fn detach_during_run() { + future!(f, POLL, DROP_F); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + assert!(catch_unwind(|| runnable.run()).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + }) + .run(); +} diff --git a/tests/ready.rs b/tests/ready.rs new file mode 100644 index 0000000..e345565 --- /dev/null +++ b/tests/ready.rs @@ -0,0 +1,225 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Runnable; +use easy_parallel::Parallel; +use smol::future; + +// Creates a future with event counters. +// +// Usage: `future!(f, POLL, DROP_F, DROP_T)` +// +// The future `f` sleeps for 200 ms and outputs `Poll::Ready`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP_F` is incremented. +// When the output gets dropped, `DROP_T` is incremented. +macro_rules! future { + ($name:pat, $poll:ident, $drop_f:ident, $drop_t:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop_f: AtomicUsize = AtomicUsize::new(0); + static $drop_t: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = Out; + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> { + $poll.fetch_add(1, Ordering::SeqCst); + thread::sleep(ms(400)); + Poll::Ready(Out(Box::new(0), true)) + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop_f.fetch_add(1, Ordering::SeqCst); + } + } + + #[derive(Default)] + struct Out(Box<i32>, bool); + + impl Drop for Out { + fn drop(&mut self) { + if self.1 { + $drop_t.fetch_add(1, Ordering::SeqCst); + } + } + } + + Fut(Box::new(0)) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, SCHED, DROP)` +// +// The schedule function `s` does nothing. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +macro_rules! schedule { + ($name:pat, $sched:ident, $drop:ident) => { + static $drop: AtomicUsize = AtomicUsize::new(0); + static $sched: AtomicUsize = AtomicUsize::new(0); + + let $name = { + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + let guard = Guard(Box::new(0)); + move |_runnable: Runnable| { + let _ = &guard; + $sched.fetch_add(1, Ordering::SeqCst); + } + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn cancel_during_run() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + + thread::sleep(ms(400)); + + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + }) + .run(); +} + +#[test] +fn join_during_run() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + + thread::sleep(ms(200)); + + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + future::block_on(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + + thread::sleep(ms(200)); + + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + }) + .run(); +} + +#[test] +fn try_join_during_run() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, mut task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + future::block_on(future::or(&mut task, future::ready(Default::default()))); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + drop(task); + }) + .run(); +} + +#[test] +fn detach_during_run() { + future!(f, POLL, DROP_F, DROP_T); + schedule!(s, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + Parallel::new() + .add(|| { + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(DROP_T.load(Ordering::SeqCst), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(DROP_T.load(Ordering::SeqCst), 0); + }) + .run(); +} diff --git a/tests/waker_panic.rs b/tests/waker_panic.rs new file mode 100644 index 0000000..7a7792e --- /dev/null +++ b/tests/waker_panic.rs @@ -0,0 +1,330 @@ +use std::cell::Cell; +use std::future::Future; +use std::panic::{catch_unwind, AssertUnwindSafe}; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Runnable; +use atomic_waker::AtomicWaker; +use easy_parallel::Parallel; +use smol::future; + +// Creates a future with event counters. +// +// Usage: `future!(f, get_waker, POLL, DROP)` +// +// The future `f` always sleeps for 200 ms, and panics the second time it is polled. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Every time the future is run, it stores the waker into a global variable. +// This waker can be extracted using the `get_waker()` function. +macro_rules! future { + ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop: AtomicUsize = AtomicUsize::new(0); + static WAKER: AtomicWaker = AtomicWaker::new(); + + let ($name, $get_waker) = { + struct Fut(Cell<bool>, Box<i32>); + + impl Future for Fut { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + WAKER.register(cx.waker()); + $poll.fetch_add(1, Ordering::SeqCst); + thread::sleep(ms(400)); + + if self.0.get() { + panic!() + } else { + self.0.set(true); + Poll::Pending + } + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap()) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, chan, SCHED, DROP)` +// +// The schedule function `s` pushes the task into `chan`. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Receiver `chan` extracts the task when it is scheduled. +macro_rules! schedule { + ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { + static $drop: AtomicUsize = AtomicUsize::new(0); + static $sched: AtomicUsize = AtomicUsize::new(0); + + let ($name, $chan) = { + let (s, r) = flume::unbounded(); + + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + let guard = Guard(Box::new(0)); + let sched = move |runnable: Runnable| { + let _ = &guard; + $sched.fetch_add(1, Ordering::SeqCst); + s.send(runnable).unwrap(); + }; + + (sched, r) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +fn try_await<T>(f: impl Future<Output = T>) -> Option<T> { + future::block_on(future::poll_once(f)) +} + +#[test] +fn wake_during_run() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + let waker = get_waker(); + waker.wake_by_ref(); + let runnable = chan.recv().unwrap(); + + Parallel::new() + .add(|| { + assert!(catch_unwind(|| runnable.run()).is_err()); + drop(get_waker()); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .add(|| { + thread::sleep(ms(200)); + + waker.wake(); + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(400)); + + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .run(); +} + +#[test] +fn cancel_during_run() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + let waker = get_waker(); + waker.wake(); + let runnable = chan.recv().unwrap(); + + Parallel::new() + .add(|| { + assert!(catch_unwind(|| runnable.run()).is_err()); + drop(get_waker()); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .add(|| { + thread::sleep(ms(200)); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(400)); + + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .run(); +} + +#[test] +fn wake_and_cancel_during_run() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + let waker = get_waker(); + waker.wake_by_ref(); + let runnable = chan.recv().unwrap(); + + Parallel::new() + .add(|| { + assert!(catch_unwind(|| runnable.run()).is_err()); + drop(get_waker()); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .add(|| { + thread::sleep(ms(200)); + + waker.wake(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(400)); + + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .run(); +} + +#[flaky_test::flaky_test] +fn cancel_and_wake_during_run() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + POLL.store(0, Ordering::SeqCst); + DROP_F.store(0, Ordering::SeqCst); + SCHEDULE.store(0, Ordering::SeqCst); + DROP_S.store(0, Ordering::SeqCst); + + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + let waker = get_waker(); + waker.wake_by_ref(); + let runnable = chan.recv().unwrap(); + + Parallel::new() + .add(|| { + assert!(catch_unwind(|| runnable.run()).is_err()); + drop(get_waker()); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .add(|| { + thread::sleep(ms(200)); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + waker.wake(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(400)); + + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .run(); +} + +#[test] +fn panic_and_poll() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + get_waker().wake(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + let mut task = task; + assert!(try_await(&mut task).is_none()); + + let runnable = chan.recv().unwrap(); + assert!(catch_unwind(|| runnable.run()).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + assert!(catch_unwind(AssertUnwindSafe(|| try_await(&mut task))).is_err()); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + + drop(get_waker()); + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} diff --git a/tests/waker_pending.rs b/tests/waker_pending.rs new file mode 100644 index 0000000..9c95cba --- /dev/null +++ b/tests/waker_pending.rs @@ -0,0 +1,365 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Runnable; +use atomic_waker::AtomicWaker; +use easy_parallel::Parallel; + +// Creates a future with event counters. +// +// Usage: `future!(f, get_waker, POLL, DROP)` +// +// The future `f` always sleeps for 200 ms and returns `Poll::Pending`. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Every time the future is run, it stores the waker into a global variable. +// This waker can be extracted using the `get_waker()` function. +macro_rules! future { + ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop: AtomicUsize = AtomicUsize::new(0); + static WAKER: AtomicWaker = AtomicWaker::new(); + + let ($name, $get_waker) = { + struct Fut(Box<i32>); + + impl Future for Fut { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + WAKER.register(cx.waker()); + $poll.fetch_add(1, Ordering::SeqCst); + thread::sleep(ms(400)); + Poll::Pending + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + (Fut(Box::new(0)), || WAKER.take().unwrap()) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, chan, SCHED, DROP)` +// +// The schedule function `s` pushes the task into `chan`. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Receiver `chan` extracts the task when it is scheduled. +macro_rules! schedule { + ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { + static $drop: AtomicUsize = AtomicUsize::new(0); + static $sched: AtomicUsize = AtomicUsize::new(0); + + let ($name, $chan) = { + let (s, r) = flume::unbounded(); + + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + let guard = Guard(Box::new(0)); + let sched = move |runnable: Runnable| { + let _ = &guard; + $sched.fetch_add(1, Ordering::SeqCst); + s.send(runnable).unwrap(); + }; + + (sched, r) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn wake_during_run() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, _task) = async_task::spawn(f, s); + + runnable.run(); + let waker = get_waker(); + waker.wake_by_ref(); + let runnable = chan.recv().unwrap(); + + Parallel::new() + .add(|| { + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 2); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 1); + }) + .add(|| { + thread::sleep(ms(200)); + + waker.wake_by_ref(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(400)); + + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 2); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 1); + }) + .run(); + + chan.recv().unwrap(); + drop(get_waker()); +} + +#[test] +fn cancel_during_run() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + let waker = get_waker(); + waker.wake(); + let runnable = chan.recv().unwrap(); + + Parallel::new() + .add(|| { + runnable.run(); + drop(get_waker()); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .add(|| { + thread::sleep(ms(200)); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(400)); + + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .run(); +} + +#[test] +fn wake_and_cancel_during_run() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + let waker = get_waker(); + waker.wake_by_ref(); + let runnable = chan.recv().unwrap(); + + Parallel::new() + .add(|| { + runnable.run(); + drop(get_waker()); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .add(|| { + thread::sleep(ms(200)); + + waker.wake(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(400)); + + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .run(); +} + +#[test] +fn cancel_and_wake_during_run() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + let waker = get_waker(); + waker.wake_by_ref(); + let runnable = chan.recv().unwrap(); + + Parallel::new() + .add(|| { + runnable.run(); + drop(get_waker()); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .add(|| { + thread::sleep(ms(200)); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + waker.wake(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + thread::sleep(ms(400)); + + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); + }) + .run(); +} + +#[test] +fn drop_last_waker() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + let waker = get_waker(); + + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + drop(waker); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 1); + + chan.recv().unwrap().run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); +} + +#[test] +fn cancel_last_task() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + drop(get_waker()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + drop(task); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 1); + + chan.recv().unwrap().run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); +} + +#[test] +fn drop_last_task() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + + runnable.run(); + drop(get_waker()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + task.detach(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 1); + + chan.recv().unwrap().run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); +} diff --git a/tests/waker_ready.rs b/tests/waker_ready.rs new file mode 100644 index 0000000..10d38cb --- /dev/null +++ b/tests/waker_ready.rs @@ -0,0 +1,278 @@ +use std::cell::Cell; +use std::future::Future; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use async_task::Runnable; +use atomic_waker::AtomicWaker; + +// Creates a future with event counters. +// +// Usage: `future!(f, get_waker, POLL, DROP)` +// +// The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled. +// When it gets polled, `POLL` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Every time the future is run, it stores the waker into a global variable. +// This waker can be extracted using the `get_waker()` function. +macro_rules! future { + ($name:pat, $get_waker:pat, $poll:ident, $drop:ident) => { + static $poll: AtomicUsize = AtomicUsize::new(0); + static $drop: AtomicUsize = AtomicUsize::new(0); + static WAKER: AtomicWaker = AtomicWaker::new(); + + let ($name, $get_waker) = { + struct Fut(Cell<bool>, Box<i32>); + + impl Future for Fut { + type Output = Box<i32>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + WAKER.register(cx.waker()); + $poll.fetch_add(1, Ordering::SeqCst); + thread::sleep(ms(200)); + + if self.0.get() { + Poll::Ready(Box::new(0)) + } else { + self.0.set(true); + Poll::Pending + } + } + } + + impl Drop for Fut { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + (Fut(Cell::new(false), Box::new(0)), || WAKER.take().unwrap()) + }; + }; +} + +// Creates a schedule function with event counters. +// +// Usage: `schedule!(s, chan, SCHED, DROP)` +// +// The schedule function `s` pushes the task into `chan`. +// When it gets invoked, `SCHED` is incremented. +// When it gets dropped, `DROP` is incremented. +// +// Receiver `chan` extracts the task when it is scheduled. +macro_rules! schedule { + ($name:pat, $chan:pat, $sched:ident, $drop:ident) => { + static $drop: AtomicUsize = AtomicUsize::new(0); + static $sched: AtomicUsize = AtomicUsize::new(0); + + let ($name, $chan) = { + let (s, r) = flume::unbounded(); + + struct Guard(Box<i32>); + + impl Drop for Guard { + fn drop(&mut self) { + $drop.fetch_add(1, Ordering::SeqCst); + } + } + + let guard = Guard(Box::new(0)); + let sched = move |runnable: Runnable| { + let _ = &guard; + $sched.fetch_add(1, Ordering::SeqCst); + s.send(runnable).unwrap(); + }; + + (sched, r) + }; + }; +} + +fn ms(ms: u64) -> Duration { + Duration::from_millis(ms) +} + +#[test] +fn wake() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (mut runnable, task) = async_task::spawn(f, s); + task.detach(); + + assert!(chan.is_empty()); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + get_waker().wake(); + runnable = chan.recv().unwrap(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + get_waker().wake(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); +} + +#[test] +fn wake_by_ref() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (mut runnable, task) = async_task::spawn(f, s); + task.detach(); + + assert!(chan.is_empty()); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + get_waker().wake_by_ref(); + runnable = chan.recv().unwrap(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + get_waker().wake_by_ref(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); +} + +#[test] +fn clone() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (mut runnable, task) = async_task::spawn(f, s); + task.detach(); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + let w2 = get_waker().clone(); + let w3 = w2.clone(); + let w4 = w3.clone(); + w4.wake(); + + runnable = chan.recv().unwrap(); + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + w3.wake(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + drop(w2); + drop(get_waker()); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); +} + +#[test] +fn wake_dropped() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + task.detach(); + + runnable.run(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + let waker = get_waker(); + + waker.wake_by_ref(); + drop(chan.recv().unwrap()); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + waker.wake(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); +} + +#[test] +fn wake_completed() { + future!(f, get_waker, POLL, DROP_F); + schedule!(s, chan, SCHEDULE, DROP_S); + let (runnable, task) = async_task::spawn(f, s); + task.detach(); + + runnable.run(); + let waker = get_waker(); + assert_eq!(POLL.load(Ordering::SeqCst), 1); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 0); + assert_eq!(DROP_F.load(Ordering::SeqCst), 0); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + waker.wake(); + chan.recv().unwrap().run(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 0); + assert_eq!(chan.len(), 0); + + get_waker().wake(); + assert_eq!(POLL.load(Ordering::SeqCst), 2); + assert_eq!(SCHEDULE.load(Ordering::SeqCst), 1); + assert_eq!(DROP_F.load(Ordering::SeqCst), 1); + assert_eq!(DROP_S.load(Ordering::SeqCst), 1); + assert_eq!(chan.len(), 0); +} |