1 // Copyright (C) 2005-2006 The Trustees of Indiana University.
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 // Authors: Douglas Gregor
9 #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
10 #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
12 #ifndef BOOST_GRAPH_USE_MPI
13 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
16 #include <boost/graph/parallel/process_group.hpp>
17 #include <boost/type_traits/is_convertible.hpp>
19 #include <boost/assert.hpp>
20 #include <boost/optional.hpp>
23 namespace boost { namespace graph { namespace detail {
25 template<typename ProcessGroup>
26 void do_synchronize(ProcessGroup& pg)
28 using boost::parallel::synchronize;
32 struct remote_set_queued {};
33 struct remote_set_immediate {};
35 template<typename ProcessGroup>
36 class remote_set_semantics
40 queued = (is_convertible<
41 typename ProcessGroup::communication_category,
42 boost::parallel::bsp_process_group_tag>::value));
45 typedef typename mpl::if_c<queued,
47 remote_set_immediate>::type type;
51 template<typename Derived, typename ProcessGroup, typename Value,
53 typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
54 class remote_update_set;
56 /**********************************************************************
57 * Remote updating set that queues messages until synchronization *
58 **********************************************************************/
59 template<typename Derived, typename ProcessGroup, typename Value,
61 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
64 typedef typename property_traits<OwnerMap>::key_type Key;
65 typedef std::vector<std::pair<Key, Value> > Updates;
66 typedef typename Updates::size_type updates_size_type;
67 typedef typename Updates::value_type updates_pair_type;
72 typedef typename ProcessGroup::process_id_type process_id_type;
75 /** Message containing the number of updates that will be sent in
76 * a msg_updates message that will immediately follow. This
77 * message will contain a single value of type
82 /** Contains (key, value) pairs with all of the updates from a
83 * particular source. The number of updates is variable, but will
84 * be provided in a msg_num_updates message that immediately
85 * preceeds this message.
91 struct handle_messages
94 handle_messages(remote_update_set* self, const ProcessGroup& pg)
95 : self(self), update_sizes(num_processes(pg), 0) { }
97 void operator()(process_id_type source, int tag)
100 case msg_num_updates:
102 // Receive the # of updates
103 updates_size_type num_updates;
104 receive(self->process_group, source, tag, num_updates);
106 update_sizes[source] = num_updates;
112 updates_size_type num_updates = update_sizes[source];
113 BOOST_ASSERT(num_updates);
115 // Receive the actual updates
116 std::vector<updates_pair_type> updates(num_updates);
117 receive(self->process_group, source, msg_updates, &updates[0],
120 // Send updates to derived "receive_update" member
121 Derived* derived = static_cast<Derived*>(self);
122 for (updates_size_type u = 0; u < num_updates; ++u)
123 derived->receive_update(source, updates[u].first, updates[u].second);
125 update_sizes[source] = 0;
132 remote_update_set* self;
133 std::vector<updates_size_type> update_sizes;
135 friend struct handle_messages;
138 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
139 : process_group(pg, handle_messages(this, pg)),
140 updates(num_processes(pg)), owner(owner) {
144 void update(const Key& key, const Value& value)
146 if (get(owner, key) == process_id(process_group)) {
147 Derived* derived = static_cast<Derived*>(this);
148 derived->receive_update(get(owner, key), key, value);
151 updates[get(owner, key)].push_back(std::make_pair(key, value));
159 // Emit all updates and then remove them
160 process_id_type num_processes = updates.size();
161 for (process_id_type p = 0; p < num_processes; ++p) {
162 if (!updates[p].empty()) {
163 send(process_group, p, msg_num_updates, updates[p].size());
164 send(process_group, p, msg_updates,
165 &updates[p].front(), updates[p].size());
170 do_synchronize(process_group);
173 ProcessGroup process_group;
176 std::vector<Updates> updates;
180 /**********************************************************************
181 * Remote updating set that sends messages immediately *
182 **********************************************************************/
183 template<typename Derived, typename ProcessGroup, typename Value,
185 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
186 remote_set_immediate>
188 typedef typename property_traits<OwnerMap>::key_type Key;
189 typedef std::pair<Key, Value> update_pair_type;
190 typedef typename std::vector<update_pair_type>::size_type updates_size_type;
193 typedef typename ProcessGroup::process_id_type process_id_type;
197 /** Contains a (key, value) pair that will be updated. */
201 struct handle_messages
203 explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
205 { update_sizes.resize(num_processes(pg), 0); }
207 void operator()(process_id_type source, int tag)
209 // Receive the # of updates
210 BOOST_ASSERT(tag == msg_update);
211 update_pair_type update;
212 receive(self->process_group, source, tag, update);
214 // Send update to derived "receive_update" member
215 Derived* derived = static_cast<Derived*>(self);
216 derived->receive_update(source, update.first, update.second);
220 std::vector<updates_size_type> update_sizes;
221 remote_update_set* self;
223 friend struct handle_messages;
226 remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
227 : process_group(pg, handle_messages(this, pg)), owner(owner) { }
229 void update(const Key& key, const Value& value)
231 if (get(owner, key) == process_id(process_group)) {
232 Derived* derived = static_cast<Derived*>(this);
233 derived->receive_update(get(owner, key), key, value);
236 send(process_group, get(owner, key), msg_update,
237 update_pair_type(key, value));
242 typedef std::pair<process_id_type, int> probe_type;
243 handle_messages handler(this, process_group);
244 while (optional<probe_type> stp = probe(process_group))
245 if (stp->second == msg_update) handler(stp->first, stp->second);
250 do_synchronize(process_group);
253 ProcessGroup process_group;
257 } } } // end namespace boost::graph::detail
259 #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP