/* hazardpointer.vala * * Copyright (C) 2011 Maciej Piechotka * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2.1 of the License, or (at your option) any later version. * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * You should have received a copy of the GNU Lesser General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA * * Author: * Maciej Piechotka */ /** * Hazard pointer is a method of protecting a pointer shared by many threads. * If you want to use atomic pointer that may be freed you should use following code: * * {{{ * string *shared_pointer = ...; * HazardPointer hptr = HazardPointer.get_hazard_pointer (&shared_pointer); * // my_string contains value from shared_pinter. It is valid as long as hptr is alive. * unowned string my_string = ptr.get (); * // instead of delete * ptr.release ((ptr) => {string *sptr = ptr;string ref = (owned)sptr;}); * }); * }}} * * In some cases you may use helper methods which might involve copying of object (and are unsafe for unowned objects): * {{{ * Gtk.Window *window = ...; * Gtk.Window? local_window = HazardPointer.get_pointer (&window); * HazardPointer.set_pointer (&window, ...) * local_window = HazardPointer.exchange_pointer (&window, null); * HazardPointer.compare_and_exchange (&window, null, local_window); * }}} * * The class also provides helper methods if least significant bits are used for storing flags. * * HazardPointers are not thread-safe (unless documentation states otherwise). */ [Compact] public class Gee.HazardPointer { // FIXME: Make it a struct /** * Creates a hazard pointer for a pointer. * * @param ptr Protected pointer */ public HazardPointer (G *ptr) { this._node = acquire (); this._node.set ((void *)ptr); } /** * Create a hazard pointer from Node. */ internal HazardPointer.from_node (Node node) { this._node = node; } /** * Gets hazard pointer from atomic pointer safely. * * @param aptr Atomic pointer. * @param mask Mask of bits. * @param mask_out Result of mask. * @return Hazard pointer containing the element. */ public static HazardPointer? get_hazard_pointer (G **aptr, size_t mask = 0, out size_t mask_out = null) { unowned Node node = acquire (); void *rptr = null; void *ptr = null; mask_out = 0; do { rptr = AtomicPointer.get ((void **)aptr); ptr = (void *)((size_t) rptr & ~mask); mask_out = (size_t) rptr & mask; node.set (ptr); } while (rptr != AtomicPointer.get ((void **)aptr)); if (ptr != null) { return new HazardPointer.from_node (node); } else { node.release (); return null; } } /** * Copy an object from atomic pointer. * * @param aptr Atomic pointer. * @param mask Mask of flags. * @param mask_out Result of mask. * @return A copy of object from atomic pointer. */ public static G? get_pointer (G **aptr, size_t mask = 0, out size_t mask_out = null) { unowned Node node = acquire (); void *rptr = null; void *ptr = null; mask_out = 0; do { rptr = AtomicPointer.get ((void **)aptr); ptr = (void *)((size_t) rptr & ~mask); mask_out = (size_t) rptr & mask; node.set (ptr); } while (rptr != AtomicPointer.get ((void **)aptr)); G? res = (G *)ptr; node.release (); return res; } /** * Exchange objects safly. * * @param aptr Atomic pointer. * @param new_ptr New value * @param mask Mask of flags. * @param new_mask New mask. * @param old_mask Previous mask mask. * @return Hazard pointer containing old value. */ public static HazardPointer? exchange_hazard_pointer (G **aptr, owned G? new_ptr, size_t mask = 0, size_t new_mask = 0, out size_t old_mask = null) { unowned Node? new_node = null; if (new_ptr != null) { new_node = acquire (); new_node.set (new_ptr); } old_mask = 0; void *new_rptr = (void *)((size_t)((owned) new_ptr) | (mask & new_mask)); unowned Node node = acquire (); void *rptr = null; void *ptr = null; do { rptr = AtomicPointer.get ((void **)aptr); ptr = (void *)((size_t) rptr & ~mask); old_mask = (size_t) rptr & mask; node.set (ptr); } while (!AtomicPointer.compare_and_exchange((void **)aptr, rptr, new_rptr)); if (new_node != null) new_node.release (); if (ptr != null) { return new HazardPointer.from_node (node); } else { node.release (); return null; } } /** * Sets object safely * * @param aptr Atomic pointer. * @param new_ptr New value * @param mask Mask of flags. * @param new_mask New mask. */ public static void set_pointer (G **aptr, owned G? new_ptr, size_t mask = 0, size_t new_mask = 0) { HazardPointer? ptr = exchange_hazard_pointer (aptr, new_ptr, mask, new_mask, null); if (ptr != null) { DestroyNotify notify = get_destroy_notify (); ptr.release ((owned)notify); } } /** * Exchange objects safly. * * @param aptr Atomic pointer. * @param new_ptr New value * @param mask Mask of flags. * @param new_mask New mask. * @param old_mask Previous mask mask. * @return Value that was previously stored. */ public static G? exchange_pointer (G **aptr, owned G? new_ptr, size_t mask = 0, size_t new_mask = 0, out size_t old_mask = null) { HazardPointer? ptr = exchange_hazard_pointer (aptr, new_ptr, mask, new_mask, out old_mask); G? rptr = ptr != null ? ptr.get () : null; return rptr; } /** * Compares and exchanges objects. * * @param aptr Atomic pointer. * @param old_ptr Old pointer. * @param _new_ptr New value. * @param old_mask Old mask. * @param new_mask New mask. * @return Value that was previously stored. */ public static bool compare_and_exchange_pointer (G **aptr, G? old_ptr, owned G? _new_ptr, size_t mask = 0, size_t old_mask = 0, size_t new_mask = 0) { G *new_ptr = (owned)_new_ptr; void *new_rptr = (void *)((size_t)(new_ptr) | (mask & new_mask)); void *old_rptr = (void *)((size_t)(old_ptr) | (mask & old_mask)); bool success = AtomicPointer.compare_and_exchange((void **)aptr, old_rptr, new_rptr); if (success) { DestroyNotify notify = get_destroy_notify (); if (old_ptr != null) { Context.get_current_context ()->release_ptr (old_ptr, (owned)notify); } } else if (new_ptr != null) { _new_ptr = (owned)new_ptr; } return success; } ~HazardPointer () { _node.release (); } /** * Gets the pointer hold by hazard pointer. * * @param other_thread Have to be set to ``true`` if accessed from thread that did not create this thread. * @return The value hold by pointer. */ public inline new unowned G get (bool other_thread = false) { return _node[other_thread]; } /** * Free the pointer. * * @param notify method freeing object */ public void release (owned DestroyNotify notify) { unowned G item = _node[false]; _node.set (null); if (item != null) { Context.get_current_context ()->release_ptr (item, (owned)notify); } } /** * Sets default policy (i.e. default policy for user-created contexts). * The policy must be concrete and should not be blocking. * * @param policy New default policy. */ public static void set_default_policy (Policy policy) requires (policy.is_concrete ()) { if (policy.is_blocking ()) warning ("Setting blocking defautl Gee.HazardPointer.Policy (there may be a deadlock).\n"); AtomicInt.set(ref _default_policy, (int)policy); } /** * Sets thread exit policy (i.e. default policy for the top-most Context). * The policy must be concrete and should not be unsafe. * * @param policy New thread policy. */ public static void set_thread_exit_policy (Policy policy) requires (policy.is_concrete ()) { if (!policy.is_safe ()) warning ("Setting unsafe globale thread-exit Gee.HazardPointer.Policy (there may be a memory leak).\n"); AtomicInt.set(ref _thread_exit_policy, (int)policy); } /** * Sets release (i.e. how exactly the released objects arefreed). * * The method can be only set before any objects is released and is not thread-safe. * * @param policy New release policy. */ public static bool set_release_policy (ReleasePolicy policy) { int old_policy = AtomicInt.get (ref release_policy); if ((old_policy & (sizeof(int) * 8 - 1)) != 0) { critical ("Attempt to change the policy of running helper. Failing."); return false; } if (!AtomicInt.compare_and_exchange (ref release_policy, old_policy, (int)policy)) { critical ("Concurrent access to release policy detected. Failing."); return false; } return true; } /** * Policy determines what happens on exit from Context. */ public enum Policy { /** * Performs default action on exit from thread. */ DEFAULT, /** * Performs the same action as on exit from current thread. */ THREAD_EXIT, /** * Goes through the free list and attempts to free un-freed elements. */ TRY_FREE, /** * Goes through the free list and attempts to free un-freed elements * untill all elements are freed. */ FREE, /** * Release the un-freed elements to either helper thread or to main loop. * Please note if the operation would block it is not performed. */ TRY_RELEASE, /** * Release the un-freed elements to either helper thread or to main loop. * Please note it may block while adding to queue. */ RELEASE; /** * Checks if the policy is concrete or if it depends on global variables. * * @return ``true`` if this policy does not depend on global variables */ public bool is_concrete () { switch (this) { case DEFAULT: case THREAD_EXIT: return false; case TRY_FREE: case FREE: case TRY_RELEASE: case RELEASE: return true; default: assert_not_reached (); } } /** * Checks if policy blocks or is lock-free. * Please note that it works on a concrete policy only. * * @return ``true`` if the policy may block the thread. */ public bool is_blocking () requires (this.is_concrete ()) { switch (this) { case TRY_FREE: case TRY_RELEASE: return false; case FREE: case RELEASE: return true; default: assert_not_reached (); } } /** * Checks if policy guarantees freeing all elements. * Please note that it works on a concrete policy only. * * @return ``true`` if the policy guarantees freeing all elements. */ public bool is_safe () requires (this.is_concrete ()) { switch (this) { case TRY_FREE: case TRY_RELEASE: return false; case FREE: case RELEASE: return true; default: assert_not_reached (); } } /** * Finds concrete policy which corresponds to given policy. * * @return Policy that corresponds to given policy at given time in given thread. */ public Policy to_concrete () ensures (result.is_concrete ()) { switch (this) { case TRY_FREE: case FREE: case TRY_RELEASE: case RELEASE: return this; case DEFAULT: return (Policy) AtomicInt.get (ref _default_policy); case THREAD_EXIT: return (Policy) AtomicInt.get (ref _thread_exit_policy); default: assert_not_reached (); } } /** * Runs the policy. * @param to_free List containing elements to free. * @return Non-empty list of not freed elements or ``null`` if all elements have been disposed. */ internal ArrayList? perform (owned ArrayList to_free) { switch (this.to_concrete ()) { case TRY_FREE: return try_free (to_free) ? (owned) to_free : null; case FREE: while (try_free (to_free)) { Thread.yield (); } return null; case TRY_RELEASE: ReleasePolicy.ensure_start (); if (_queue_mutex.trylock ()) { _queue.offer ((owned) to_free); _queue_mutex.unlock (); return null; } else { return (owned) to_free; } case RELEASE: ReleasePolicy.ensure_start (); _queue_mutex.lock (); _queue.offer ((owned) to_free); _queue_mutex.unlock (); return null; default: assert_not_reached (); } } } public delegate void DestroyNotify (void *ptr); /** * Release policy determines what happens with object freed by Policy.TRY_RELEASE * and Policy.RELEASE. */ public enum ReleasePolicy { /** * Libgee spawns helper thread to free those elements. * This is default. */ HELPER_THREAD, /** * Libgee uses GLib main loop. * This is recommended for application using GLib main loop. */ MAIN_LOOP; private static void start (ReleasePolicy self) { // FIXME: Make it non-static [bug 659778] switch (self) { case HELPER_THREAD: Thread.create (() => { Thread.self ().set_priority (ThreadPriority.LOW); while (true) { Thread.yield (); attempt_free (); } }, false); break; case MAIN_LOOP: Idle.add (() => { attempt_free (); return true; }, Priority.LOW); break; default: assert_not_reached (); } } /** * Ensures that helper methods are started. */ internal static inline void ensure_start () { int policy = AtomicInt.get (ref release_policy); if ((policy & (1 << (sizeof(int) * 8 - 1))) != 0) return; if (_queue_mutex.trylock ()) { policy = AtomicInt.get (ref release_policy); if ((policy & (1 << (sizeof(int) * 8 - 1))) == 0) { _queue = new LinkedList> (); // Hack to not lie about successfull setting policy policy = AtomicInt.exchange_and_add (ref release_policy, (int)(1 << (sizeof(int) * 8 - 1))); start ((ReleasePolicy) policy); } _queue_mutex.unlock (); } } private static inline void attempt_free () { if (_queue_mutex.trylock ()) { Collection> temp = new ArrayList> (); _queue.drain (temp); _queue_mutex.unlock (); temp.foreach ((x) => {_global_to_free.add_all (x); return true;}); } try_free (_global_to_free); } } /** * Create a new context. User does not need to create explicitly however it might be benefitial * if he is about to issue bunch of commands he might consider it benefitial to fine-tune the creation of contexts. * * {{{ * Context ctx = new Context (); * lock_free_collection.operation1 (); * // Normally on exit the thread exit operation would be executed but here the default operation of * // child context is executed. * lock_free_collection.operation2 (); * }}} * * Please note that the Context in implicitly part of stack and: * * 1. It cannot be moved between threads. * 2. If in given thread the child (created later) context is alive parent must be alive as well. */ [Compact] public class Context { // FIXME: Should be struct public Context (Policy? policy = null) { this._to_free = new ArrayList (); this._parent = _current_context.get (); _current_context.set (this, null); if (policy == null) { if (_parent == null) { _policy = (Policy)AtomicInt.get (ref _thread_exit_policy); } else { _policy = (Policy)AtomicInt.get (ref _default_policy); } } else { this._policy = policy.to_concrete (); } #if DEBUG stderr.printf ("Entering context %p (policy %s, parent %p)\n", this, _policy != null ? _policy.to_string () : null, _parent); #endif } ~Context () { #if DEBUG stderr.printf ("Exiting context %p (policy %s, parent %p)\n", this, _policy != null ? _policy.to_string () : null, _parent); #endif int size = _to_free.size; bool clean_parent = false; if (size > 0) { ArrayList? remaining; if (_parent == null || size >= THRESHOLD) remaining = _policy.perform ((owned) _to_free); else remaining = (owned) _to_free; if (remaining != null) { assert (_parent != null); _parent->_to_free.add_all (remaining); clean_parent = true; } } #if DEBUG stderr.printf ("Setting current context to %p\n", _parent); #endif _current_context.set (_parent, null); if (clean_parent) HazardPointer.try_free (_parent->_to_free); } /** * Tries to free all freed pointer in current context. */ public void try_free () { HazardPointer.try_free (_to_free); } /** * Ensure that whole context is freed. Plase note that it might block. */ public void free_all () { while (HazardPointer.try_free (_to_free)) Thread.yield (); } /** * Tries to push the current context to releaser. */ public void try_release () { if (_queue_mutex.trylock ()) { _queue.offer ((owned) _to_free); _to_free = new ArrayList (); _queue_mutex.unlock (); } } /** * Pushes the current context to releaser. Plase note that it might block. */ public void release () { _queue_mutex.lock (); _queue.offer ((owned) _to_free); _to_free = new ArrayList (); _queue_mutex.unlock (); } /** * Add pointer to freed array. */ internal inline void release_ptr (void *ptr, owned DestroyNotify notify) { FreeNode *node = new FreeNode (); node->pointer = ptr; node->destroy_notify = (owned)notify; _to_free.add (node); if (_to_free.size >= THRESHOLD) HazardPointer.try_free (_to_free); } /** * Gets current context. */ internal inline static Context *get_current_context () { return _current_context.get (); } internal Context *_parent; internal ArrayList _to_free; internal Policy? _policy; internal static StaticPrivate _current_context; internal static StaticPrivate _root_context; private static uint THRESHOLD = 10; } /** * Gets a new hazard pointer node. * * @return new hazard pointer node. */ internal static inline unowned Node acquire () { for (unowned Node? curr = get_head (); curr != null; curr = curr.get_next ()) if (curr.activate ()) return curr; Node *node = new Node (); Node *old_head = null; do { node->set_next (old_head = (Node *)AtomicPointer.get (&_head)); } while (!AtomicPointer.compare_and_exchange (&_head, old_head, node)); return node; } /** * Tries to free from list. * * @return ``true`` if list is empty. */ internal static bool try_free (ArrayList to_free) { Collection used = new HashSet(); for (unowned Node? current = get_head (); current != null; current = current.get_next ()) { used.add (current.get ()); } for (int i = 0; i < to_free.size;) { FreeNode *current = to_free[i]; if (used.contains (current->pointer)) { #if DEBUG stderr.printf ("Skipping freeing %p\n", current->pointer); #endif i++; } else { #if DEBUG stderr.printf ("Freeing %p\n", current->pointer); #endif FreeNode *cur = to_free.remove_at (to_free.size - 1); if (i != to_free.size) { FreeNode *temp = to_free[i]; to_free[i] = cur; cur = temp; } cur->destroy_notify (cur->pointer); delete cur; } } return to_free.size > 0; } /** * Gets head of hazard pointers. * @return Hazard pointer head. */ internal static unowned Node? get_head () { return (Node *)AtomicPointer.get(&_head); } internal unowned Node _node; internal static Node *_head = null; internal static int _default_policy = (int)Policy.TRY_FREE; internal static int _thread_exit_policy = (int)Policy.RELEASE; internal static int release_policy = 0; internal static Queue> _queue; internal static StaticMutex _queue_mutex; internal static ArrayList _global_to_free; internal static DestroyNotify get_destroy_notify () { return (ptr) => { G *gptr = ptr; G obj = (owned)gptr; obj = null; }; } [Compact] internal class FreeNode { public void *pointer; public DestroyNotify destroy_notify; } /** * List of used pointers. */ [Compact] internal class Node { public Node () { AtomicPointer.set (&_hazard, null); AtomicInt.set (ref _active, 1); } inline ~Node () { delete _next; } public void release () { AtomicPointer.set (&_hazard, null); AtomicInt.set (ref _active, 0); } public inline bool is_active () { return AtomicInt.get (ref _active) != 0; } public inline bool activate () { return AtomicInt.compare_and_exchange (ref _active, 0, 1); } public inline void set (void *ptr) { AtomicPointer.set (&_hazard, ptr); } public inline void *get (bool safe = true) { if (safe) { return (void *)AtomicPointer.get (&_hazard); } else { return (void *)_hazard; } } public inline unowned Node? get_next () { return (Node *)AtomicPointer.get (&_next); } public inline void set_next (Node *next) { AtomicPointer.set (&_next, next); } public Node *_next; public int _active; public void *_hazard; } }