Implement ConcurrentList
authorMaciej Piechotka <uzytkownik2@gmail.com>
Sat, 26 Mar 2011 04:24:24 +0000 (05:24 +0100)
committerMaciej Piechotka <uzytkownik2@gmail.com>
Sun, 25 Sep 2011 19:28:20 +0000 (21:28 +0200)
 - After porting to volatile the freeing of data does not work

gee/Makefile.am
gee/concurrentlist.vala [new file with mode: 0644]
tests/Makefile.am
tests/testconcurrentlist.vala [new file with mode: 0644]
tests/testmain.vala

index cf41f42..7800cf4 100644 (file)
@@ -22,6 +22,7 @@ libgee_0_8_la_SOURCES = \
        bidirmapiterator.vala \
        collection.vala \
        comparable.vala \
+       concurrentlist.vala \
        deque.vala \
        functions.vala \
        hashable.vala \
diff --git a/gee/concurrentlist.vala b/gee/concurrentlist.vala
new file mode 100644 (file)
index 0000000..91bfa22
--- /dev/null
@@ -0,0 +1,585 @@
+/* concurrentlist.vala
+ *
+ * Copyright (C) 2011  Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
+ *
+ * Author:
+ *     Maciej Piechotka <uzytkownik2@gmail.com>
+ */
+
+/**
+ * A single-linked list. This implementation is based on
+ * [[www.cse.yorku.ca/~ruppert/papers/lfll.pdf|Mikhail Fomitchev and  Eric Ruppert paper ]].
+ *
+ * Many threads are allowed to operate on the same structure as well as modification
+ * of structure during iteration is allowed. However the change may not be immidiatly
+ * visible to other threads.
+ */
+public class Gee.ConcurrentList<G> : AbstractList<G> {
+       /**
+        * The elements' equality testing function.
+        */
+       public Gee.EqualDataFunc equal_func { private set; get; }
+
+       /**
+        * Construct new, empty single linked list
+        *
+        * If not provided, the function parameter is requested to the
+        * {@link Functions} function factory methods.
+        *
+        * @param equal_func an optional element equality testing function
+        */
+       public ConcurrentList (owned Gee.EqualDataFunc? equal_func = null) {
+               if (equal_func == null)
+                       equal_func = Gee.Functions.get_equal_func_for (typeof (G));
+               this.equal_func = (owned)equal_func;
+               _head = new Node<G>.head ();
+               HazardPointer.set_pointer<Node<G>> (&_tail, _head);
+       }
+
+       ~ConcurrentList () {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               _head = null;
+               HazardPointer.set_pointer<Node<G>> (&_tail, null);
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override bool read_only {
+               get {
+                       return false;
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override int size {
+               get {
+                       HazardPointer.Context ctx = new HazardPointer.Context ();
+                       int result = 0;
+                       for (var iter = iterator (); iter.next ();)
+                               result++;
+                       return result;
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override bool is_empty {
+               get {
+                       return !iterator ().next ();
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override bool contains (G item) {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               for (var iter = iterator (); iter.next ();)
+                       if (equal_func (item, iter.get ()))
+                               return true;
+               return false;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override bool add (G item) {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               Node<G> node = new Node<G> (item);
+               node.insert (get_tail (), null);
+               return true;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override bool remove (G item) {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               Gee.Iterator<G> iter = iterator ();
+               while (iter.next ()) {
+                       if (equal_func (item, iter.get ())) {
+                               iter.remove ();
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override void clear () {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               var iter = iterator ();
+               while (iter.next ())
+                       iter.remove ();
+               HazardPointer.set_pointer (&_tail, _head);
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override Gee.Iterator<G> iterator () {
+               return new Iterator<G> (_head);
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override ListIterator<G> list_iterator () {
+               return new Iterator<G> (_head);
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override G? get (int index) {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               assert (index >= 0);
+               for (var iterator = iterator (); iterator.next ();)
+                       if (index-- == 0)
+                               return iterator.get ();
+               assert_not_reached ();
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override void set (int index, G item) {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               assert (index >= 0);
+               for (var iterator = list_iterator (); iterator.next ();) {
+                       if (index-- == 0) {
+                               iterator.set (item);
+                               return;
+                       }
+               }
+               assert_not_reached ();
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override int index_of (G item) {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               int index = 0;
+               for (var iterator = list_iterator (); iterator.next (); index++)
+                       if (equal_func (item, iterator.get ()))
+                               return index;
+               return -1;
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override void insert (int index, G item) {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               assert (index >= 0);
+               if (index == 0) {
+                       var prev = _head;
+                       var next = _head.get_next ();
+                       Node<G> new_node = new Node<G> (item);
+                       new_node.insert (prev, next);
+               } else {
+                       for (var iterator = list_iterator (); iterator.next ();) {
+                               if (--index == 0) {
+                                       iterator.add (item);
+                                       return;
+                               }
+                       }
+                       assert_not_reached ();
+               }
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override G remove_at (int index) {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               for (var iterator = list_iterator (); iterator.next ();) {
+                       if (index-- == 0) {
+                               G data = iterator.get ();
+                               iterator.remove ();
+                               return data;
+                       }
+               }
+               assert_not_reached ();
+       }
+
+       /**
+        * {@inheritDoc}
+        */
+       public override List<G>? slice (int start, int end) {
+               HazardPointer.Context ctx = new HazardPointer.Context ();
+               assert (0 <= start);
+               assert (start <= end);
+               var list = new ConcurrentList<G> (equal_func);
+               var iterator = iterator ();
+               int idx = 0;
+               for (; iterator.next (); idx++)
+                       if (idx >= start && idx < end)
+                               list.add (iterator.get ());
+                       else if (idx >= end)
+                               break;
+               assert (idx >= end);
+               return list;
+       }
+
+       private inline Node<G> update_tail () {
+               Node<G> tail = HazardPointer.get_pointer (&_tail);
+               Node.backtrace<G> (ref tail);
+               Node.search_for<G> (null, ref tail);
+               HazardPointer.set_pointer<Node<G>> (&_tail, tail);
+               return tail;
+       }
+
+       private inline Node<G> get_tail () {
+               return update_tail ();
+       }
+
+       private Node<G> _head;
+       private Node<G> *_tail;
+
+       private class Iterator<G> : Object, Gee.Traversable<G>, Gee.Iterator<G>, ListIterator<G> {
+               public Iterator (Node<G> head) {
+                       _started = false;
+                       _removed = false;
+                       _index = -1;
+                       _prev = null;
+                       _curr = head;
+               }
+
+               public bool next () {
+                       HazardPointer.Context ctx = new HazardPointer.Context ();
+                       Node<G>? _old_prev = _removed ? _prev : null;
+                       bool success = Node.proceed<G> (ref _prev, ref _curr);
+                       if (success) {
+                               if (_removed)
+                                       _prev = _old_prev;
+                               _removed = false;
+                               _started = true;
+                               _index++;
+                       }
+                       return success;
+               }
+
+               public bool has_next () {
+                       HazardPointer.Context ctx = new HazardPointer.Context ();
+                       Node<G>? prev = _prev;
+                       Node<G>? curr = _curr;
+                       return Node.proceed<G> (ref prev, ref curr);
+               }
+
+               public new G get () {
+                       HazardPointer.Context ctx = new HazardPointer.Context ();
+                       assert (valid);
+                       return HazardPointer.get_pointer<G> (&_curr._data);
+               }
+
+               public new void set (G item) {
+                       HazardPointer.Context ctx = new HazardPointer.Context ();
+                       assert (valid);
+#if DEBUG
+                       G item_copy = item;
+                       stderr.printf ("  Setting data %p to %p\n", _curr, item_copy);
+                       HazardPointer.set_pointer<G> (&_curr._data, (owned)item_copy);
+#else
+                       HazardPointer.set_pointer<G> (&_curr._data, item);
+#endif
+               }
+
+               public void remove () {
+                       HazardPointer.Context ctx = new HazardPointer.Context ();
+                       assert (valid);
+                       _curr.remove (_prev);
+                       _removed = true;
+                       _index--;
+               }
+
+               public bool valid {
+                       get { return _started && !_removed && _curr != null; }
+               }
+
+               public bool read_only { get { return false; } }
+
+               public int index() {
+                       assert (valid);
+                       return _index;
+               }
+
+               public void add (G item) {
+                       HazardPointer.Context ctx = new HazardPointer.Context ();
+                       assert (valid);
+                       if (!Node.proceed<G> (ref _prev, ref _curr)) {
+                               _prev = (owned)_curr;
+                               _curr = null;
+                       }
+                       Node<G> new_node = new Node<G> (item);
+                       new_node.insert (_prev, _curr);
+                       _curr = (owned)new_node;
+                       _index++;
+               }
+
+               public new void foreach (ForallFunc<G> f) {
+                       HazardPointer.Context ctx = new HazardPointer.Context ();
+                       if (_started && !_removed)
+                               f (HazardPointer.get_pointer<G> (&_curr._data));
+                       Node<G>? _old_prev = _removed ? _prev : null;
+                       while (Node.proceed<G> (ref _prev, ref _curr)) {
+                               if (_removed)
+                                       _prev = _old_prev;
+                               _removed = false;
+                               _started = true;
+                               _index++;
+                               f (HazardPointer.get_pointer<G> (&_curr._data));
+                       }
+               }
+
+               public Gee.Iterator<G> stream<A> (owned StreamFunc<G, A> f) {
+                       return Gee.Iterator.stream_impl<G, A> (this, (owned)f);
+               }
+
+               public Gee.Iterator<G> filter (owned Predicate<G> f) {
+                       return Traversable.filter_impl<G> (this, (owned)f);
+               }
+
+               public Gee.Iterator<G> chop (int offset, int length = -1) {
+                       return Traversable.chop_impl<G> (this, offset, length);
+               }
+
+               private bool _started;
+               private bool _removed;
+               private int _index;
+               private Node<G> _prev;
+               private Node<G>? _curr;
+       }
+
+       private class Node<G> {
+               public inline Node (G data) {
+                       AtomicPointer.set (&_succ, null);
+                       AtomicPointer.set (&_backlink, null);
+                       G data_copy = data;
+                       G *data_ptr = (owned)data_copy;
+#if DEBUG
+                       stderr.printf ("  Creating node %p with data %p\n", this, data_ptr);
+#endif
+                       AtomicPointer.set (&_data, (owned)data_ptr);
+               }
+
+               public inline Node.head () {
+                       AtomicPointer.set (&_succ, null);
+                       AtomicPointer.set (&_backlink, null);
+                       AtomicPointer.set (&_data, null);
+#if DEBUG
+                       stderr.printf ("  Creating head node %p\n", this);
+#endif
+               }
+
+               inline ~Node () {
+                       HazardPointer.set_pointer<Node<G>> (&_succ, null, 3);
+                       HazardPointer.set_pointer<Node<G>> (&_backlink, null);
+#if DEBUG
+                       G? old_data = HazardPointer.exchange_pointer (&_data, null);
+                       stderr.printf ("  Freeing node %p (with data %p)\n", this, old_data);
+#else
+                       HazardPointer.set_pointer<G> (&_data, null);
+#endif
+               }
+
+               public static inline bool proceed<G> (ref Node<G> prev, ref Node<G> curr, bool force = false) {
+                       Node<G> next = curr.get_next ();
+                       while (next != null) {
+                               State next_state = next.get_state ();
+                               State curr_state;
+                               Node<G> curr_next = curr.get_succ (out curr_state);
+                               if (next_state != State.MARKED || (curr_state == State.MARKED && curr_next == next))
+                                       break;
+                               if (curr_next == next)
+                                       next.help_marked (curr);
+                               next = curr_next;
+                       }
+                       bool success = next != null;
+                       if (success || force) {
+                               prev = (owned)curr;
+                               curr = (owned)next;
+#if DEBUG
+                               stderr.printf ("  Procceed to %p (previous %p)\n", curr, prev);
+#endif
+                       }
+                       return success;
+               }
+
+               public static inline bool search_for<G> (Node<G>? goal, ref Node<G>? prev) {
+                       Node<G>? curr = prev.get_next ();
+                       while ((curr != goal || curr != null) && proceed<G> (ref prev, ref curr, true));
+                       return curr == goal;
+               }
+
+               public inline bool remove (Node<G> prev_node) {
+#if DEBUG
+                       stderr.printf ("  Removing %p (previous %p)\n", this, prev_node);
+#endif
+                       Node<G>? prev = prev_node;
+                       bool result = try_flag (ref prev);
+                       if (prev != null)
+                               help_flagged (prev);
+                       return result;
+               }
+
+               public inline void insert (owned Node<G> prev, Node<G>? next) {
+#if DEBUG
+                       stderr.printf ("  Inserting %p between %p and %p\n", this, prev, next);
+#endif
+                       while (true) {
+                               State prev_state;
+                               Node<G>? prev_next = get_succ (out prev_state);
+                               if (prev_state == State.FLAGGED) {
+                                       prev_next.help_flagged (prev);
+                               } else {
+                                       set_succ (next, State.NONE);
+                                       bool result = prev.compare_and_exchange (next, State.NONE, this, State.NONE);
+                                       if (result)
+                                               return;
+                                       prev_next = get_succ (out prev_state);
+                                       if (prev_state == State.FLAGGED)
+                                               prev_next.help_flagged (prev);
+                                       backtrace<G> (ref prev);
+                               }
+                               search_for<G> (next, ref prev);
+                       }
+                       
+               }
+
+               public inline void help_flagged (Node<G> prev) {
+#if DEBUG
+                       stderr.printf ("    Help flagging %p (previous %p)\n", this, prev);
+#endif
+                       set_backlink (prev);
+                       if (get_state () == State.MARKED)
+                               try_mark ();
+                       help_marked (prev);
+               }
+
+               public inline void try_mark () {
+#if DEBUG
+                       stderr.printf ("    Try flagging %p\n", this);
+#endif
+                       do {
+                               Node<G>? next_node = get_next ();
+                               bool result = compare_and_exchange (next_node, State.NONE, next_node, State.MARKED);
+                               if (!result && get_state () == State.FLAGGED)
+                                       help_flagged (next_node);
+                       } while (get_state () != State.MARKED);
+               }
+
+               public inline void help_marked (Node<G> prev_node) {
+#if DEBUG
+                       stderr.printf ("    Help marking %p (previous %p)\n", this, prev_node);
+#endif
+                       prev_node.compare_and_exchange (this, State.FLAGGED, get_next (), State.NONE);
+               }
+
+               public inline bool try_flag (ref Node<G>? prev_node) {
+#if DEBUG
+                       stderr.printf ("    Try flagging %p (previous %p)\n", this, prev_node);
+#endif
+                       while (true) {
+                               if (prev_node.compare_succ (this, State.FLAGGED))
+                                       return false;
+                               bool result = prev_node.compare_and_exchange (this, State.NONE, this, State.FLAGGED);
+                               if (result)
+                                       return true;
+                               State result_state;
+                               Node<G>? result_node = prev_node.get_succ (out result_state);
+                               if (result_node == this && result_state == State.FLAGGED)
+                                       return false;
+                               backtrace<G> (ref prev_node);
+                               if (!search_for<G> (this, ref prev_node)) {
+                                       prev_node = null;
+                                       return false;
+                               }
+                       }
+               }
+
+               public static inline void backtrace<G> (ref Node<G>? curr) {
+                       while (curr.get_state () == State.MARKED)
+                               curr = curr.get_backlink ();
+               }
+
+               public inline bool compare_and_exchange (Node<G>? old_node, State old_state, Node<G>? new_node, State new_state) {
+#if DEBUG
+                       bool b = HazardPointer.compare_and_exchange_pointer (&_succ, old_node, new_node, 3, (size_t)old_state, (size_t)new_state);
+                       stderr.printf ("      Setting %p.succ to (%p, %s) if %p.succ is (%p, %s): %s\n", this, new_node, new_state.to_string (), this, old_node, old_state.to_string (), b ? "success" : "failure");
+                       return b;
+#else
+                       return HazardPointer.compare_and_exchange_pointer (&_succ, old_node, new_node, 3, (size_t)old_state, (size_t)new_state);
+#endif
+               }
+
+               public inline bool compare_succ (Node<G>? next, State state) {
+                       size_t cur = (size_t)AtomicPointer.get (&_succ);
+                       return cur == ((size_t)next | (size_t)state);
+               }
+
+               public inline Node<G>? get_next () {
+                       return get_succ (null);
+               }
+
+               public inline State get_state () {
+                       return (State)((size_t)AtomicPointer.get (&_succ) & 3);
+               }
+
+               public inline Node<G>? get_succ (out State state) {
+                       size_t rstate;
+                       Node<G>? succ = HazardPointer.get_pointer<Node<G>> (&_succ, 3, out rstate);
+                       state = (State)rstate;
+                       return (owned)succ;
+               }
+
+               public inline void set_succ (Node<G>? next, State state) {
+#if DEBUG
+                       stderr.printf ("      Setting %p.succ to (%p, %s)\n", this, next, state.to_string ());
+#endif
+                       HazardPointer.set_pointer<Node<G>> (&_succ, next, 3, (size_t)state);
+               }
+
+               public inline Node<G>? get_backlink () {
+                       return HazardPointer.get_pointer<Node<G>> (&_backlink);
+               }
+
+               public inline void set_backlink (Node<G>? backlink) {
+#if DEBUG
+                       stderr.printf ("      Setting backlink from %p to %p\n", this, backlink);
+#endif
+                       HazardPointer.compare_and_exchange_pointer<Node<G>> (&_backlink, null, backlink);
+               }
+
+               public Node<G> *_succ;
+               public Node<G> *_backlink;
+               public G *_data;
+       }
+
+       private enum State {
+               NONE = 0,
+               MARKED = 1,
+               FLAGGED = 2
+       }
+}
index 5cc570c..a3bcafe 100644 (file)
@@ -9,6 +9,7 @@ tests_SOURCES = \
        testbidirlist.vala \
        testcase.vala \
        testcollection.vala \
+       testconcurrentlist.vala \
        testdeque.vala \
        testfunctions.vala \
        testhashmap.vala \
diff --git a/tests/testconcurrentlist.vala b/tests/testconcurrentlist.vala
new file mode 100644 (file)
index 0000000..3bc5039
--- /dev/null
@@ -0,0 +1,38 @@
+/* testconcurrentlist.vala
+ *
+ * Copyright (C) 2011  Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
+ *
+ * Author:
+ *     Maciej Piechotka <uzytkownik2@gmail.com>
+ */
+
+using Gee;
+
+public class ConcurrentListTests : ListTests {
+       public ConcurrentListTests () {
+               base ("ConcurrentList");
+       }
+
+       public override void set_up () {
+               test_collection = new Gee.ConcurrentList<string> ();
+       }
+
+       public override void tear_down () {
+               test_collection = null;
+       }
+}
+
index 214a241..b605f70 100644 (file)
@@ -25,6 +25,7 @@ void main (string[] args) {
        Test.init (ref args);
 
        TestSuite.get_root ().add_suite (new ArrayListTests ().get_suite ());
+       TestSuite.get_root ().add_suite (new ConcurrentListTests ().get_suite ());
        TestSuite.get_root ().add_suite (new FunctionsTests ().get_suite ());
        TestSuite.get_root ().add_suite (new HashMapTests ().get_suite ());
        TestSuite.get_root ().add_suite (new HashMultiMapTests ().get_suite ());