summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/lib.rs843
1 files changed, 843 insertions, 0 deletions
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..24fbfac
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,843 @@
+//! Async executors.
+//!
+//! # Examples
+//!
+//! ```
+//! use async_executor::Executor;
+//! use futures_lite::future;
+//!
+//! // Create a new executor.
+//! let ex = Executor::new();
+//!
+//! // Spawn a task.
+//! let task = ex.spawn(async {
+//! println!("Hello world");
+//! });
+//!
+//! // Run the executor until the task completes.
+//! future::block_on(ex.run(task));
+//! ```
+
+#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
+
+use std::future::Future;
+use std::marker::PhantomData;
+use std::panic::{RefUnwindSafe, UnwindSafe};
+use std::rc::Rc;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::sync::{Arc, Mutex, RwLock};
+use std::task::{Poll, Waker};
+
+use async_lock::OnceCell;
+use async_task::Runnable;
+use concurrent_queue::ConcurrentQueue;
+use futures_lite::{future, prelude::*};
+use slab::Slab;
+
+#[doc(no_inline)]
+pub use async_task::Task;
+
+/// An async executor.
+///
+/// # Examples
+///
+/// A multi-threaded executor:
+///
+/// ```
+/// use async_channel::unbounded;
+/// use async_executor::Executor;
+/// use easy_parallel::Parallel;
+/// use futures_lite::future;
+///
+/// let ex = Executor::new();
+/// let (signal, shutdown) = unbounded::<()>();
+///
+/// Parallel::new()
+/// // Run four executor threads.
+/// .each(0..4, |_| future::block_on(ex.run(shutdown.recv())))
+/// // Run the main future on the current thread.
+/// .finish(|| future::block_on(async {
+/// println!("Hello world!");
+/// drop(signal);
+/// }));
+/// ```
+#[derive(Debug)]
+pub struct Executor<'a> {
+ /// The executor state.
+ state: OnceCell<Arc<State>>,
+
+ /// Makes the `'a` lifetime invariant.
+ _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
+}
+
+unsafe impl Send for Executor<'_> {}
+unsafe impl Sync for Executor<'_> {}
+
+impl UnwindSafe for Executor<'_> {}
+impl RefUnwindSafe for Executor<'_> {}
+
+impl<'a> Executor<'a> {
+ /// Creates a new executor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::Executor;
+ ///
+ /// let ex = Executor::new();
+ /// ```
+ pub const fn new() -> Executor<'a> {
+ Executor {
+ state: OnceCell::new(),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Returns `true` if there are no unfinished tasks.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::Executor;
+ ///
+ /// let ex = Executor::new();
+ /// assert!(ex.is_empty());
+ ///
+ /// let task = ex.spawn(async {
+ /// println!("Hello world");
+ /// });
+ /// assert!(!ex.is_empty());
+ ///
+ /// assert!(ex.try_tick());
+ /// assert!(ex.is_empty());
+ /// ```
+ pub fn is_empty(&self) -> bool {
+ self.state().active.lock().unwrap().is_empty()
+ }
+
+ /// Spawns a task onto the executor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::Executor;
+ ///
+ /// let ex = Executor::new();
+ ///
+ /// let task = ex.spawn(async {
+ /// println!("Hello world");
+ /// });
+ /// ```
+ pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
+ let mut active = self.state().active.lock().unwrap();
+
+ // Remove the task from the set of active tasks when the future finishes.
+ let index = active.vacant_entry().key();
+ let state = self.state().clone();
+ let future = async move {
+ let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
+ future.await
+ };
+
+ // Create the task and register it in the set of active tasks.
+ let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) };
+ active.insert(runnable.waker());
+
+ runnable.schedule();
+ task
+ }
+
+ /// Attempts to run a task if at least one is scheduled.
+ ///
+ /// Running a scheduled task means simply polling its future once.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::Executor;
+ ///
+ /// let ex = Executor::new();
+ /// assert!(!ex.try_tick()); // no tasks to run
+ ///
+ /// let task = ex.spawn(async {
+ /// println!("Hello world");
+ /// });
+ /// assert!(ex.try_tick()); // a task was found
+ /// ```
+ pub fn try_tick(&self) -> bool {
+ match self.state().queue.pop() {
+ Err(_) => false,
+ Ok(runnable) => {
+ // Notify another ticker now to pick up where this ticker left off, just in case
+ // running the task takes a long time.
+ self.state().notify();
+
+ // Run the task.
+ runnable.run();
+ true
+ }
+ }
+ }
+
+ /// Runs a single task.
+ ///
+ /// Running a task means simply polling its future once.
+ ///
+ /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::Executor;
+ /// use futures_lite::future;
+ ///
+ /// let ex = Executor::new();
+ ///
+ /// let task = ex.spawn(async {
+ /// println!("Hello world");
+ /// });
+ /// future::block_on(ex.tick()); // runs the task
+ /// ```
+ pub async fn tick(&self) {
+ let state = self.state();
+ let runnable = Ticker::new(state).runnable().await;
+ runnable.run();
+ }
+
+ /// Runs the executor until the given future completes.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::Executor;
+ /// use futures_lite::future;
+ ///
+ /// let ex = Executor::new();
+ ///
+ /// let task = ex.spawn(async { 1 + 2 });
+ /// let res = future::block_on(ex.run(async { task.await * 2 }));
+ ///
+ /// assert_eq!(res, 6);
+ /// ```
+ pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
+ let runner = Runner::new(self.state());
+
+ // A future that runs tasks forever.
+ let run_forever = async {
+ loop {
+ for _ in 0..200 {
+ let runnable = runner.runnable().await;
+ runnable.run();
+ }
+ future::yield_now().await;
+ }
+ };
+
+ // Run `future` and `run_forever` concurrently until `future` completes.
+ future.or(run_forever).await
+ }
+
+ /// Returns a function that schedules a runnable task when it gets woken up.
+ fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
+ let state = self.state().clone();
+
+ // TODO(stjepang): If possible, push into the current local queue and notify the ticker.
+ move |runnable| {
+ state.queue.push(runnable).unwrap();
+ state.notify();
+ }
+ }
+
+ /// Returns a reference to the inner state.
+ fn state(&self) -> &Arc<State> {
+ self.state.get_or_init_blocking(|| Arc::new(State::new()))
+ }
+}
+
+impl Drop for Executor<'_> {
+ fn drop(&mut self) {
+ if let Some(state) = self.state.get() {
+ let mut active = state.active.lock().unwrap();
+ for w in active.drain() {
+ w.wake();
+ }
+ drop(active);
+
+ while state.queue.pop().is_ok() {}
+ }
+ }
+}
+
+impl<'a> Default for Executor<'a> {
+ fn default() -> Executor<'a> {
+ Executor::new()
+ }
+}
+
+/// A thread-local executor.
+///
+/// The executor can only be run on the thread that created it.
+///
+/// # Examples
+///
+/// ```
+/// use async_executor::LocalExecutor;
+/// use futures_lite::future;
+///
+/// let local_ex = LocalExecutor::new();
+///
+/// future::block_on(local_ex.run(async {
+/// println!("Hello world!");
+/// }));
+/// ```
+#[derive(Debug)]
+pub struct LocalExecutor<'a> {
+ /// The inner executor.
+ inner: Executor<'a>,
+
+ /// Makes the type `!Send` and `!Sync`.
+ _marker: PhantomData<Rc<()>>,
+}
+
+impl UnwindSafe for LocalExecutor<'_> {}
+impl RefUnwindSafe for LocalExecutor<'_> {}
+
+impl<'a> LocalExecutor<'a> {
+ /// Creates a single-threaded executor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::LocalExecutor;
+ ///
+ /// let local_ex = LocalExecutor::new();
+ /// ```
+ pub const fn new() -> LocalExecutor<'a> {
+ LocalExecutor {
+ inner: Executor::new(),
+ _marker: PhantomData,
+ }
+ }
+
+ /// Returns `true` if there are no unfinished tasks.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::LocalExecutor;
+ ///
+ /// let local_ex = LocalExecutor::new();
+ /// assert!(local_ex.is_empty());
+ ///
+ /// let task = local_ex.spawn(async {
+ /// println!("Hello world");
+ /// });
+ /// assert!(!local_ex.is_empty());
+ ///
+ /// assert!(local_ex.try_tick());
+ /// assert!(local_ex.is_empty());
+ /// ```
+ pub fn is_empty(&self) -> bool {
+ self.inner().is_empty()
+ }
+
+ /// Spawns a task onto the executor.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::LocalExecutor;
+ ///
+ /// let local_ex = LocalExecutor::new();
+ ///
+ /// let task = local_ex.spawn(async {
+ /// println!("Hello world");
+ /// });
+ /// ```
+ pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
+ let mut active = self.inner().state().active.lock().unwrap();
+
+ // Remove the task from the set of active tasks when the future finishes.
+ let index = active.vacant_entry().key();
+ let state = self.inner().state().clone();
+ let future = async move {
+ let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
+ future.await
+ };
+
+ // Create the task and register it in the set of active tasks.
+ let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) };
+ active.insert(runnable.waker());
+
+ runnable.schedule();
+ task
+ }
+
+ /// Attempts to run a task if at least one is scheduled.
+ ///
+ /// Running a scheduled task means simply polling its future once.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::LocalExecutor;
+ ///
+ /// let ex = LocalExecutor::new();
+ /// assert!(!ex.try_tick()); // no tasks to run
+ ///
+ /// let task = ex.spawn(async {
+ /// println!("Hello world");
+ /// });
+ /// assert!(ex.try_tick()); // a task was found
+ /// ```
+ pub fn try_tick(&self) -> bool {
+ self.inner().try_tick()
+ }
+
+ /// Runs a single task.
+ ///
+ /// Running a task means simply polling its future once.
+ ///
+ /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::LocalExecutor;
+ /// use futures_lite::future;
+ ///
+ /// let ex = LocalExecutor::new();
+ ///
+ /// let task = ex.spawn(async {
+ /// println!("Hello world");
+ /// });
+ /// future::block_on(ex.tick()); // runs the task
+ /// ```
+ pub async fn tick(&self) {
+ self.inner().tick().await
+ }
+
+ /// Runs the executor until the given future completes.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// use async_executor::LocalExecutor;
+ /// use futures_lite::future;
+ ///
+ /// let local_ex = LocalExecutor::new();
+ ///
+ /// let task = local_ex.spawn(async { 1 + 2 });
+ /// let res = future::block_on(local_ex.run(async { task.await * 2 }));
+ ///
+ /// assert_eq!(res, 6);
+ /// ```
+ pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
+ self.inner().run(future).await
+ }
+
+ /// Returns a function that schedules a runnable task when it gets woken up.
+ fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
+ let state = self.inner().state().clone();
+
+ move |runnable| {
+ state.queue.push(runnable).unwrap();
+ state.notify();
+ }
+ }
+
+ /// Returns a reference to the inner executor.
+ fn inner(&self) -> &Executor<'a> {
+ &self.inner
+ }
+}
+
+impl<'a> Default for LocalExecutor<'a> {
+ fn default() -> LocalExecutor<'a> {
+ LocalExecutor::new()
+ }
+}
+
+/// The state of a executor.
+#[derive(Debug)]
+struct State {
+ /// The global queue.
+ queue: ConcurrentQueue<Runnable>,
+
+ /// Local queues created by runners.
+ local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
+
+ /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
+ notified: AtomicBool,
+
+ /// A list of sleeping tickers.
+ sleepers: Mutex<Sleepers>,
+
+ /// Currently active tasks.
+ active: Mutex<Slab<Waker>>,
+}
+
+impl State {
+ /// Creates state for a new executor.
+ fn new() -> State {
+ State {
+ queue: ConcurrentQueue::unbounded(),
+ local_queues: RwLock::new(Vec::new()),
+ notified: AtomicBool::new(true),
+ sleepers: Mutex::new(Sleepers {
+ count: 0,
+ wakers: Vec::new(),
+ free_ids: Vec::new(),
+ }),
+ active: Mutex::new(Slab::new()),
+ }
+ }
+
+ /// Notifies a sleeping ticker.
+ #[inline]
+ fn notify(&self) {
+ if self
+ .notified
+ .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
+ .is_ok()
+ {
+ let waker = self.sleepers.lock().unwrap().notify();
+ if let Some(w) = waker {
+ w.wake();
+ }
+ }
+ }
+}
+
+/// A list of sleeping tickers.
+#[derive(Debug)]
+struct Sleepers {
+ /// Number of sleeping tickers (both notified and unnotified).
+ count: usize,
+
+ /// IDs and wakers of sleeping unnotified tickers.
+ ///
+ /// A sleeping ticker is notified when its waker is missing from this list.
+ wakers: Vec<(usize, Waker)>,
+
+ /// Reclaimed IDs.
+ free_ids: Vec<usize>,
+}
+
+impl Sleepers {
+ /// Inserts a new sleeping ticker.
+ fn insert(&mut self, waker: &Waker) -> usize {
+ let id = match self.free_ids.pop() {
+ Some(id) => id,
+ None => self.count + 1,
+ };
+ self.count += 1;
+ self.wakers.push((id, waker.clone()));
+ id
+ }
+
+ /// Re-inserts a sleeping ticker's waker if it was notified.
+ ///
+ /// Returns `true` if the ticker was notified.
+ fn update(&mut self, id: usize, waker: &Waker) -> bool {
+ for item in &mut self.wakers {
+ if item.0 == id {
+ if !item.1.will_wake(waker) {
+ item.1 = waker.clone();
+ }
+ return false;
+ }
+ }
+
+ self.wakers.push((id, waker.clone()));
+ true
+ }
+
+ /// Removes a previously inserted sleeping ticker.
+ ///
+ /// Returns `true` if the ticker was notified.
+ fn remove(&mut self, id: usize) -> bool {
+ self.count -= 1;
+ self.free_ids.push(id);
+
+ for i in (0..self.wakers.len()).rev() {
+ if self.wakers[i].0 == id {
+ self.wakers.remove(i);
+ return false;
+ }
+ }
+ true
+ }
+
+ /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
+ fn is_notified(&self) -> bool {
+ self.count == 0 || self.count > self.wakers.len()
+ }
+
+ /// Returns notification waker for a sleeping ticker.
+ ///
+ /// If a ticker was notified already or there are no tickers, `None` will be returned.
+ fn notify(&mut self) -> Option<Waker> {
+ if self.wakers.len() == self.count {
+ self.wakers.pop().map(|item| item.1)
+ } else {
+ None
+ }
+ }
+}
+
+/// Runs task one by one.
+#[derive(Debug)]
+struct Ticker<'a> {
+ /// The executor state.
+ state: &'a State,
+
+ /// Set to a non-zero sleeper ID when in sleeping state.
+ ///
+ /// States a ticker can be in:
+ /// 1) Woken.
+ /// 2a) Sleeping and unnotified.
+ /// 2b) Sleeping and notified.
+ sleeping: AtomicUsize,
+}
+
+impl Ticker<'_> {
+ /// Creates a ticker.
+ fn new(state: &State) -> Ticker<'_> {
+ Ticker {
+ state,
+ sleeping: AtomicUsize::new(0),
+ }
+ }
+
+ /// Moves the ticker into sleeping and unnotified state.
+ ///
+ /// Returns `false` if the ticker was already sleeping and unnotified.
+ fn sleep(&self, waker: &Waker) -> bool {
+ let mut sleepers = self.state.sleepers.lock().unwrap();
+
+ match self.sleeping.load(Ordering::SeqCst) {
+ // Move to sleeping state.
+ 0 => self
+ .sleeping
+ .store(sleepers.insert(waker), Ordering::SeqCst),
+
+ // Already sleeping, check if notified.
+ id => {
+ if !sleepers.update(id, waker) {
+ return false;
+ }
+ }
+ }
+
+ self.state
+ .notified
+ .swap(sleepers.is_notified(), Ordering::SeqCst);
+
+ true
+ }
+
+ /// Moves the ticker into woken state.
+ fn wake(&self) {
+ let id = self.sleeping.swap(0, Ordering::SeqCst);
+ if id != 0 {
+ let mut sleepers = self.state.sleepers.lock().unwrap();
+ sleepers.remove(id);
+
+ self.state
+ .notified
+ .swap(sleepers.is_notified(), Ordering::SeqCst);
+ }
+ }
+
+ /// Waits for the next runnable task to run.
+ async fn runnable(&self) -> Runnable {
+ self.runnable_with(|| self.state.queue.pop().ok()).await
+ }
+
+ /// Waits for the next runnable task to run, given a function that searches for a task.
+ async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
+ future::poll_fn(|cx| {
+ loop {
+ match search() {
+ None => {
+ // Move to sleeping and unnotified state.
+ if !self.sleep(cx.waker()) {
+ // If already sleeping and unnotified, return.
+ return Poll::Pending;
+ }
+ }
+ Some(r) => {
+ // Wake up.
+ self.wake();
+
+ // Notify another ticker now to pick up where this ticker left off, just in
+ // case running the task takes a long time.
+ self.state.notify();
+
+ return Poll::Ready(r);
+ }
+ }
+ }
+ })
+ .await
+ }
+}
+
+impl Drop for Ticker<'_> {
+ fn drop(&mut self) {
+ // If this ticker is in sleeping state, it must be removed from the sleepers list.
+ let id = self.sleeping.swap(0, Ordering::SeqCst);
+ if id != 0 {
+ let mut sleepers = self.state.sleepers.lock().unwrap();
+ let notified = sleepers.remove(id);
+
+ self.state
+ .notified
+ .swap(sleepers.is_notified(), Ordering::SeqCst);
+
+ // If this ticker was notified, then notify another ticker.
+ if notified {
+ drop(sleepers);
+ self.state.notify();
+ }
+ }
+ }
+}
+
+/// A worker in a work-stealing executor.
+///
+/// This is just a ticker that also has an associated local queue for improved cache locality.
+#[derive(Debug)]
+struct Runner<'a> {
+ /// The executor state.
+ state: &'a State,
+
+ /// Inner ticker.
+ ticker: Ticker<'a>,
+
+ /// The local queue.
+ local: Arc<ConcurrentQueue<Runnable>>,
+
+ /// Bumped every time a runnable task is found.
+ ticks: AtomicUsize,
+}
+
+impl Runner<'_> {
+ /// Creates a runner and registers it in the executor state.
+ fn new(state: &State) -> Runner<'_> {
+ let runner = Runner {
+ state,
+ ticker: Ticker::new(state),
+ local: Arc::new(ConcurrentQueue::bounded(512)),
+ ticks: AtomicUsize::new(0),
+ };
+ state
+ .local_queues
+ .write()
+ .unwrap()
+ .push(runner.local.clone());
+ runner
+ }
+
+ /// Waits for the next runnable task to run.
+ async fn runnable(&self) -> Runnable {
+ let runnable = self
+ .ticker
+ .runnable_with(|| {
+ // Try the local queue.
+ if let Ok(r) = self.local.pop() {
+ return Some(r);
+ }
+
+ // Try stealing from the global queue.
+ if let Ok(r) = self.state.queue.pop() {
+ steal(&self.state.queue, &self.local);
+ return Some(r);
+ }
+
+ // Try stealing from other runners.
+ let local_queues = self.state.local_queues.read().unwrap();
+
+ // Pick a random starting point in the iterator list and rotate the list.
+ let n = local_queues.len();
+ let start = fastrand::usize(..n);
+ let iter = local_queues
+ .iter()
+ .chain(local_queues.iter())
+ .skip(start)
+ .take(n);
+
+ // Remove this runner's local queue.
+ let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
+
+ // Try stealing from each local queue in the list.
+ for local in iter {
+ steal(local, &self.local);
+ if let Ok(r) = self.local.pop() {
+ return Some(r);
+ }
+ }
+
+ None
+ })
+ .await;
+
+ // Bump the tick counter.
+ let ticks = self.ticks.fetch_add(1, Ordering::SeqCst);
+
+ if ticks % 64 == 0 {
+ // Steal tasks from the global queue to ensure fair task scheduling.
+ steal(&self.state.queue, &self.local);
+ }
+
+ runnable
+ }
+}
+
+impl Drop for Runner<'_> {
+ fn drop(&mut self) {
+ // Remove the local queue.
+ self.state
+ .local_queues
+ .write()
+ .unwrap()
+ .retain(|local| !Arc::ptr_eq(local, &self.local));
+
+ // Re-schedule remaining tasks in the local queue.
+ while let Ok(r) = self.local.pop() {
+ r.schedule();
+ }
+ }
+}
+
+/// Steals some items from one queue into another.
+fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
+ // Half of `src`'s length rounded up.
+ let mut count = (src.len() + 1) / 2;
+
+ if count > 0 {
+ // Don't steal more than fits into the queue.
+ if let Some(cap) = dest.capacity() {
+ count = count.min(cap - dest.len());
+ }
+
+ // Steal tasks.
+ for _ in 0..count {
+ if let Ok(t) = src.pop() {
+ assert!(dest.push(t).is_ok());
+ } else {
+ break;
+ }
+ }
+ }
+}
+
+/// Runs a closure when dropped.
+struct CallOnDrop<F: Fn()>(F);
+
+impl<F: Fn()> Drop for CallOnDrop<F> {
+ fn drop(&mut self) {
+ (self.0)();
+ }
+}