diff options
Diffstat (limited to 'gee/hazardpointer.vala')
-rw-r--r-- | gee/hazardpointer.vala | 768 |
1 files changed, 768 insertions, 0 deletions
diff --git a/gee/hazardpointer.vala b/gee/hazardpointer.vala new file mode 100644 index 0000000..3151632 --- /dev/null +++ b/gee/hazardpointer.vala @@ -0,0 +1,768 @@ +/* hazardpointer.vala + * + * Copyright (C) 2011 Maciej Piechotka + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * Author: + * Maciej Piechotka <uzytkownik2@gmail.com> + */ + +/** + * Hazard pointer is a method of protecting a pointer shared by many threads. + * If you want to use atomic pointer that may be freed you should use following code: + * + * {{{ + * string *shared_pointer = ...; + * HazardPointer<string> hptr = HazardPointer.get_hazard_pointer (&shared_pointer); + * // my_string contains value from shared_pinter. It is valid as long as hptr is alive. + * unowned string my_string = ptr.get (); + * // instead of delete + * ptr.release ((ptr) => {string *sptr = ptr;string ref = (owned)sptr;}); + * }); + * }}} + * + * In some cases you may use helper methods which might involve copying of object (and are unsafe for unowned objects): + * {{{ + * Gtk.Window *window = ...; + * Gtk.Window? local_window = HazardPointer.get_pointer (&window); + * HazardPointer.set_pointer (&window, ...) + * local_window = HazardPointer.exchange_pointer (&window, null); + * HazardPointer.compare_and_exchange (&window, null, local_window); + * }}} + * + * The class also provides helper methods if least significant bits are used for storing flags. + * + * HazardPointers are not thread-safe (unless documentation states otherwise). + */ +[Compact] +public class Gee.HazardPointer<G> { // FIXME: Make it a struct + /** + * Creates a hazard pointer for a pointer. + * + * @param ptr Protected pointer + */ + public HazardPointer (G *ptr) { + this._node = acquire (); + this._node.set ((void *)ptr); + } + + /** + * Create a hazard pointer from Node. + */ + internal HazardPointer.from_node (Node node) { + this._node = node; + } + + /** + * Gets hazard pointer from atomic pointer safely. + * + * @param aptr Atomic pointer. + * @param mask Mask of bits. + * @param mask_out Result of mask. + * @return Hazard pointer containing the element. + */ + public static HazardPointer<G>? get_hazard_pointer<G> (G **aptr, size_t mask = 0, out size_t mask_out = null) { + unowned Node node = acquire (); + void *rptr = null; + void *ptr = null; + mask_out = 0; + do { + rptr = AtomicPointer.get ((void **)aptr); + ptr = (void *)((size_t) rptr & ~mask); + mask_out = (size_t) rptr & mask; + node.set (ptr); + } while (rptr != AtomicPointer.get ((void **)aptr)); + if (ptr != null) { + return new HazardPointer<G>.from_node (node); + } else { + node.release (); + return null; + } + } + + /** + * Copy an object from atomic pointer. + * + * @param aptr Atomic pointer. + * @param mask Mask of flags. + * @param mask_out Result of mask. + * @return A copy of object from atomic pointer. + */ + public static G? get_pointer<G> (G **aptr, size_t mask = 0, out size_t mask_out = null) { + unowned Node node = acquire (); + void *rptr = null; + void *ptr = null; + mask_out = 0; + do { + rptr = AtomicPointer.get ((void **)aptr); + ptr = (void *)((size_t) rptr & ~mask); + mask_out = (size_t) rptr & mask; + node.set (ptr); + } while (rptr != AtomicPointer.get ((void **)aptr)); + G? res = (G *)ptr; + node.release (); + return res; + } + + /** + * Exchange objects safly. + * + * @param aptr Atomic pointer. + * @param new_ptr New value + * @param mask Mask of flags. + * @param new_mask New mask. + * @param old_mask Previous mask mask. + * @return Hazard pointer containing old value. + */ + public static HazardPointer<G>? exchange_hazard_pointer<G> (G **aptr, owned G? new_ptr, size_t mask = 0, size_t new_mask = 0, out size_t old_mask = null) { + unowned Node? new_node = null; + if (new_ptr != null) { + new_node = acquire (); + new_node.set (new_ptr); + } + old_mask = 0; + void *new_rptr = (void *)((size_t)((owned) new_ptr) | (mask & new_mask)); + unowned Node node = acquire (); + void *rptr = null; + void *ptr = null; + do { + rptr = AtomicPointer.get ((void **)aptr); + ptr = (void *)((size_t) rptr & ~mask); + old_mask = (size_t) rptr & mask; + node.set (ptr); + } while (!AtomicPointer.compare_and_exchange((void **)aptr, rptr, new_rptr)); + if (new_node != null) + new_node.release (); + if (ptr != null) { + return new HazardPointer<G>.from_node (node); + } else { + node.release (); + return null; + } + } + + /** + * Sets object safely + * + * @param aptr Atomic pointer. + * @param new_ptr New value + * @param mask Mask of flags. + * @param new_mask New mask. + */ + public static void set_pointer<G> (G **aptr, owned G? new_ptr, size_t mask = 0, size_t new_mask = 0) { + HazardPointer<G>? ptr = exchange_hazard_pointer<G> (aptr, new_ptr, mask, new_mask, null); + if (ptr != null) { + DestroyNotify<G> notify = get_destroy_notify<G> (); + ptr.release ((owned)notify); + } + } + + /** + * Exchange objects safly. + * + * @param aptr Atomic pointer. + * @param new_ptr New value + * @param mask Mask of flags. + * @param new_mask New mask. + * @param old_mask Previous mask mask. + * @return Value that was previously stored. + */ + public static G? exchange_pointer<G> (G **aptr, owned G? new_ptr, size_t mask = 0, size_t new_mask = 0, out size_t old_mask = null) { + HazardPointer<G>? ptr = exchange_hazard_pointer<G> (aptr, new_ptr, mask, new_mask, out old_mask); + G? rptr = ptr != null ? ptr.get () : null; + return rptr; + } + + /** + * Compares and exchanges objects. + * + * @param aptr Atomic pointer. + * @param old_ptr Old pointer. + * @param _new_ptr New value. + * @param old_mask Old mask. + * @param new_mask New mask. + * @return Value that was previously stored. + */ + public static bool compare_and_exchange_pointer<G> (G **aptr, G? old_ptr, owned G? _new_ptr, size_t mask = 0, size_t old_mask = 0, size_t new_mask = 0) { + G *new_ptr = (owned)_new_ptr; + void *new_rptr = (void *)((size_t)(new_ptr) | (mask & new_mask)); + void *old_rptr = (void *)((size_t)(old_ptr) | (mask & old_mask)); + bool success = AtomicPointer.compare_and_exchange((void **)aptr, old_rptr, new_rptr); + if (success) { + DestroyNotify<G> notify = get_destroy_notify<G> (); + if (old_ptr != null) { + Context.get_current_context ()->release_ptr (old_ptr, (owned)notify); + } + } else if (new_ptr != null) { + _new_ptr = (owned)new_ptr; + } + return success; + } + + ~HazardPointer () { + _node.release (); + } + + /** + * Gets the pointer hold by hazard pointer. + * + * @param other_thread Have to be set to ``true`` if accessed from thread that did not create this thread. + * @return The value hold by pointer. + */ + public inline new unowned G get (bool other_thread = false) { + return _node[other_thread]; + } + + /** + * Free the pointer. + * + * @param notify method freeing object + */ + public void release (owned DestroyNotify notify) { + unowned G item = _node[false]; + _node.set (null); + if (item != null) { + Context.get_current_context ()->release_ptr (item, (owned)notify); + } + } + + /** + * Sets default policy (i.e. default policy for user-created contexts). + * The policy must be concrete and should not be blocking. + * + * @param policy New default policy. + */ + public static void set_default_policy (Policy policy) requires (policy.is_concrete ()) { + if (policy.is_blocking ()) + warning ("Setting blocking defautl Gee.HazardPointer.Policy (there may be a deadlock).\n"); + AtomicInt.set(ref _default_policy, (int)policy); + } + + /** + * Sets thread exit policy (i.e. default policy for the top-most Context). + * The policy must be concrete and should not be unsafe. + * + * @param policy New thread policy. + */ + public static void set_thread_exit_policy (Policy policy) requires (policy.is_concrete ()) { + if (!policy.is_safe ()) + warning ("Setting unsafe globale thread-exit Gee.HazardPointer.Policy (there may be a memory leak).\n"); + AtomicInt.set(ref _thread_exit_policy, (int)policy); + } + + /** + * Sets release (i.e. how exactly the released objects arefreed). + * + * The method can be only set before any objects is released and is not thread-safe. + * + * @param policy New release policy. + */ + public static bool set_release_policy (ReleasePolicy policy) { + int old_policy = AtomicInt.get (ref release_policy); + if ((old_policy & (sizeof(int) * 8 - 1)) != 0) { + critical ("Attempt to change the policy of running helper. Failing."); + return false; + } + if (!AtomicInt.compare_and_exchange (ref release_policy, old_policy, (int)policy)) { + critical ("Concurrent access to release policy detected. Failing."); + return false; + } + return true; + } + + /** + * Policy determines what happens on exit from Context. + */ + public enum Policy { + /** + * Performs default action on exit from thread. + */ + DEFAULT, + /** + * Performs the same action as on exit from current thread. + */ + THREAD_EXIT, + /** + * Goes through the free list and attempts to free un-freed elements. + */ + TRY_FREE, + /** + * Goes through the free list and attempts to free un-freed elements + * untill all elements are freed. + */ + FREE, + /** + * Release the un-freed elements to either helper thread or to main loop. + * Please note if the operation would block it is not performed. + */ + TRY_RELEASE, + /** + * Release the un-freed elements to either helper thread or to main loop. + * Please note it may block while adding to queue. + */ + RELEASE; + + /** + * Checks if the policy is concrete or if it depends on global variables. + * + * @return ``true`` if this policy does not depend on global variables + */ + public bool is_concrete () { + switch (this) { + case DEFAULT: + case THREAD_EXIT: + return false; + case TRY_FREE: + case FREE: + case TRY_RELEASE: + case RELEASE: + return true; + default: + assert_not_reached (); + } + } + + /** + * Checks if policy blocks or is lock-free. + * Please note that it works on a concrete policy only. + * + * @return ``true`` if the policy may block the thread. + */ + public bool is_blocking () requires (this.is_concrete ()) { + switch (this) { + case TRY_FREE: + case TRY_RELEASE: + return false; + case FREE: + case RELEASE: + return true; + default: + assert_not_reached (); + } + } + + /** + * Checks if policy guarantees freeing all elements. + * Please note that it works on a concrete policy only. + * + * @return ``true`` if the policy guarantees freeing all elements. + */ + public bool is_safe () requires (this.is_concrete ()) { + switch (this) { + case TRY_FREE: + case TRY_RELEASE: + return false; + case FREE: + case RELEASE: + return true; + default: + assert_not_reached (); + } + } + + /** + * Finds concrete policy which corresponds to given policy. + * + * @return Policy that corresponds to given policy at given time in given thread. + */ + public Policy to_concrete () ensures (result.is_concrete ()) { + switch (this) { + case TRY_FREE: + case FREE: + case TRY_RELEASE: + case RELEASE: + return this; + case DEFAULT: + return (Policy) AtomicInt.get (ref _default_policy); + case THREAD_EXIT: + return (Policy) AtomicInt.get (ref _thread_exit_policy); + default: + assert_not_reached (); + + } + } + + /** + * Runs the policy. + * @param to_free List containing elements to free. + * @return Non-empty list of not freed elements or ``null`` if all elements have been disposed. + */ + internal ArrayList<FreeNode *>? perform (owned ArrayList<FreeNode *> to_free) { + switch (this.to_concrete ()) { + case TRY_FREE: + return try_free (to_free) ? (owned) to_free : null; + case FREE: + while (try_free (to_free)) { + Thread.yield (); + } + return null; + case TRY_RELEASE: + ReleasePolicy.ensure_start (); + if (_queue_mutex.trylock ()) { + _queue.offer ((owned) to_free); + _queue_mutex.unlock (); + return null; + } else { + return (owned) to_free; + } + case RELEASE: + ReleasePolicy.ensure_start (); + _queue_mutex.lock (); + _queue.offer ((owned) to_free); + _queue_mutex.unlock (); + return null; + default: + assert_not_reached (); + } + } + } + + public delegate void DestroyNotify (void *ptr); + + /** + * Release policy determines what happens with object freed by Policy.TRY_RELEASE + * and Policy.RELEASE. + */ + public enum ReleasePolicy { + /** + * Libgee spawns helper thread to free those elements. + * This is default. + */ + HELPER_THREAD, + /** + * Libgee uses GLib main loop. + * This is recommended for application using GLib main loop. + */ + MAIN_LOOP; + + private static void start (ReleasePolicy self) { // FIXME: Make it non-static [bug 659778] + switch (self) { + case HELPER_THREAD: + Thread.create<bool> (() => { + Thread.self<bool> ().set_priority (ThreadPriority.LOW); + while (true) { + Thread.yield (); + attempt_free (); + } + }, false); + break; + case MAIN_LOOP: + Idle.add (() => { + attempt_free (); + return true; + }, Priority.LOW); + break; + default: + assert_not_reached (); + } + } + + /** + * Ensures that helper methods are started. + */ + internal static inline void ensure_start () { + int policy = AtomicInt.get (ref release_policy); + if ((policy & (1 << (sizeof(int) * 8 - 1))) != 0) + return; + if (_queue_mutex.trylock ()) { + policy = AtomicInt.get (ref release_policy); + if ((policy & (1 << (sizeof(int) * 8 - 1))) == 0) { + _queue = new LinkedList<ArrayList<FreeNode *>> (); + // Hack to not lie about successfull setting policy + policy = AtomicInt.exchange_and_add (ref release_policy, (int)(1 << (sizeof(int) * 8 - 1))); + start ((ReleasePolicy) policy); + } + _queue_mutex.unlock (); + } + } + + private static inline void attempt_free () { + if (_queue_mutex.trylock ()) { + Collection<ArrayList<FreeNode *>> temp = new ArrayList<ArrayList<FreeNode *>> (); + _queue.drain (temp); + _queue_mutex.unlock (); + temp.foreach ((x) => {_global_to_free.add_all (x); return true;}); + } + try_free (_global_to_free); + } + } + + /** + * Create a new context. User does not need to create explicitly however it might be benefitial + * if he is about to issue bunch of commands he might consider it benefitial to fine-tune the creation of contexts. + * + * {{{ + * Context ctx = new Context (); + * lock_free_collection.operation1 (); + * // Normally on exit the thread exit operation would be executed but here the default operation of + * // child context is executed. + * lock_free_collection.operation2 (); + * }}} + * + * Please note that the Context in implicitly part of stack and: + * + * 1. It cannot be moved between threads. + * 2. If in given thread the child (created later) context is alive parent must be alive as well. + */ + [Compact] + public class Context { // FIXME: Should be struct + public Context (Policy? policy = null) { + this._to_free = new ArrayList<FreeNode *> (); + this._parent = _current_context.get (); + _current_context.set (this, null); + if (policy == null) { + if (_parent == null) { + _policy = (Policy)AtomicInt.get (ref _thread_exit_policy); + } else { + _policy = (Policy)AtomicInt.get (ref _default_policy); + } + } else { + this._policy = policy.to_concrete (); + } +#if DEBUG + stderr.printf ("Entering context %p (policy %s, parent %p)\n", this, _policy != null ? _policy.to_string () : null, _parent); +#endif + } + + ~Context () { +#if DEBUG + stderr.printf ("Exiting context %p (policy %s, parent %p)\n", this, _policy != null ? _policy.to_string () : null, _parent); +#endif + int size = _to_free.size; + bool clean_parent = false; + if (size > 0) { + ArrayList<FreeNode *>? remaining; + if (_parent == null || size >= THRESHOLD) + remaining = _policy.perform ((owned) _to_free); + else + remaining = (owned) _to_free; + if (remaining != null) { + assert (_parent != null); + _parent->_to_free.add_all (remaining); + clean_parent = true; + } + } +#if DEBUG + stderr.printf ("Setting current context to %p\n", _parent); +#endif + _current_context.set (_parent, null); + if (clean_parent) + HazardPointer.try_free (_parent->_to_free); + } + + /** + * Tries to free all freed pointer in current context. + */ + public void try_free () { + HazardPointer.try_free (_to_free); + } + + /** + * Ensure that whole context is freed. Plase note that it might block. + */ + public void free_all () { + while (HazardPointer.try_free (_to_free)) + Thread.yield (); + } + + /** + * Tries to push the current context to releaser. + */ + public void try_release () { + if (_queue_mutex.trylock ()) { + _queue.offer ((owned) _to_free); + _to_free = new ArrayList<FreeNode *> (); + _queue_mutex.unlock (); + } + } + + /** + * Pushes the current context to releaser. Plase note that it might block. + */ + public void release () { + _queue_mutex.lock (); + _queue.offer ((owned) _to_free); + _to_free = new ArrayList<FreeNode *> (); + _queue_mutex.unlock (); + } + + /** + * Add pointer to freed array. + */ + internal inline void release_ptr (void *ptr, owned DestroyNotify notify) { + FreeNode *node = new FreeNode (); + node->pointer = ptr; + node->destroy_notify = (owned)notify; + _to_free.add (node); + if (_to_free.size >= THRESHOLD) + HazardPointer.try_free (_to_free); + } + + /** + * Gets current context. + */ + internal inline static Context *get_current_context () { + return _current_context.get (); + } + + internal Context *_parent; + internal ArrayList<FreeNode *> _to_free; + internal Policy? _policy; + internal static StaticPrivate _current_context; + internal static StaticPrivate _root_context; + private static uint THRESHOLD = 10; + } + + /** + * Gets a new hazard pointer node. + * + * @return new hazard pointer node. + */ + internal static inline unowned Node acquire () { + for (unowned Node? curr = get_head (); curr != null; curr = curr.get_next ()) + if (curr.activate ()) + return curr; + Node *node = new Node (); + Node *old_head = null; + do { + node->set_next (old_head = (Node *)AtomicPointer.get (&_head)); + } while (!AtomicPointer.compare_and_exchange (&_head, old_head, node)); + return node; + } + + /** + * Tries to free from list. + * + * @return ``true`` if list is empty. + */ + internal static bool try_free (ArrayList<FreeNode *> to_free) { + Collection<void *> used = new HashSet<void *>(); + for (unowned Node? current = get_head (); current != null; current = current.get_next ()) { + used.add (current.get ()); + } + for (int i = 0; i < to_free.size;) { + FreeNode *current = to_free[i]; + if (used.contains (current->pointer)) { +#if DEBUG + stderr.printf ("Skipping freeing %p\n", current->pointer); +#endif + i++; + } else { +#if DEBUG + stderr.printf ("Freeing %p\n", current->pointer); +#endif + FreeNode *cur = to_free.remove_at (to_free.size - 1); + if (i != to_free.size) { + FreeNode *temp = to_free[i]; + to_free[i] = cur; + cur = temp; + } + cur->destroy_notify (cur->pointer); + delete cur; + } + } + return to_free.size > 0; + } + + /** + * Gets head of hazard pointers. + * @return Hazard pointer head. + */ + internal static unowned Node? get_head () { + return (Node *)AtomicPointer.get(&_head); + } + + internal unowned Node _node; + + internal static Node *_head = null; + + internal static int _default_policy = (int)Policy.TRY_FREE; + internal static int _thread_exit_policy = (int)Policy.RELEASE; + + internal static int release_policy = 0; + + internal static Queue<ArrayList<FreeNode *>> _queue; + internal static StaticMutex _queue_mutex; + + internal static ArrayList<FreeNode *> _global_to_free; + + internal static DestroyNotify get_destroy_notify<G> () { + return (ptr) => { + G *gptr = ptr; + G obj = (owned)gptr; + obj = null; + }; + } + + [Compact] + internal class FreeNode { + public void *pointer; + public DestroyNotify destroy_notify; + } + + /** + * List of used pointers. + */ + [Compact] + internal class Node { + public Node () { + AtomicPointer.set (&_hazard, null); + AtomicInt.set (ref _active, 1); + } + + inline ~Node () { + delete _next; + } + + public void release () { + AtomicPointer.set (&_hazard, null); + AtomicInt.set (ref _active, 0); + } + + public inline bool is_active () { + return AtomicInt.get (ref _active) != 0; + } + + public inline bool activate () { + return AtomicInt.compare_and_exchange (ref _active, 0, 1); + } + + public inline void set (void *ptr) { + AtomicPointer.set (&_hazard, ptr); + } + + public inline void *get (bool safe = true) { + if (safe) { + return (void *)AtomicPointer.get (&_hazard); + } else { + return (void *)_hazard; + } + } + + public inline unowned Node? get_next () { + return (Node *)AtomicPointer.get (&_next); + } + + public inline void set_next (Node *next) { + AtomicPointer.set (&_next, next); + } + + public Node *_next; + public int _active; + public void *_hazard; + } +} + |