1 // Copyright (C) 2018 Alain Miniussi <alain.miniussi@oca.eu>.
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
7 // Request implementation dtails
9 // This header should be included only after the communicator and request
10 // classes has been defined.
11 #ifndef BOOST_MPI_REQUEST_HANDLERS_HPP
12 #define BOOST_MPI_REQUEST_HANDLERS_HPP
14 #include <boost/mpi/skeleton_and_content_types.hpp>
16 namespace boost { namespace mpi {
20 * Internal data structure that stores everything required to manage
21 * the receipt of serialized data via a request object.
24 struct serialized_irecv_data {
25 serialized_irecv_data(const communicator& comm, T& value)
26 : m_ia(comm), m_value(value) {}
28 void deserialize(status& stat)
40 struct serialized_irecv_data<packed_iarchive>
42 serialized_irecv_data(communicator const&, packed_iarchive& ia) : m_ia(ia) { }
44 void deserialize(status&) { /* Do nothing. */ }
47 packed_iarchive& m_ia;
51 * Internal data structure that stores everything required to manage
52 * the receipt of an array of serialized data via a request object.
55 struct serialized_array_irecv_data
57 serialized_array_irecv_data(const communicator& comm, T* values, int n)
58 : m_count(0), m_ia(comm), m_values(values), m_nb(n) {}
60 void deserialize(status& stat);
69 void serialized_array_irecv_data<T>::deserialize(status& stat)
72 T* end = m_values+m_nb;
80 * Internal data structure that stores everything required to manage
81 * the receipt of an array of primitive data but unknown size.
82 * Such an array can have been send with blocking operation and so must
83 * be compatible with the (size_t,raw_data[]) format.
85 template<typename T, class A>
86 struct dynamic_array_irecv_data
88 BOOST_STATIC_ASSERT_MSG(is_mpi_datatype<T>::value, "Can only be specialized for MPI datatypes.");
90 dynamic_array_irecv_data(std::vector<T,A>& values)
91 : m_count(-1), m_values(values) {}
94 std::vector<T,A>& m_values;
98 struct serialized_irecv_data<const skeleton_proxy<T> >
100 serialized_irecv_data(const communicator& comm, skeleton_proxy<T> proxy)
101 : m_isa(comm), m_ia(m_isa.get_skeleton()), m_proxy(proxy) { }
103 void deserialize(status& stat)
105 m_isa >> m_proxy.object;
110 packed_skeleton_iarchive m_isa;
111 packed_iarchive& m_ia;
112 skeleton_proxy<T> m_proxy;
116 struct serialized_irecv_data<skeleton_proxy<T> >
117 : public serialized_irecv_data<const skeleton_proxy<T> >
119 typedef serialized_irecv_data<const skeleton_proxy<T> > inherited;
121 serialized_irecv_data(const communicator& comm, const skeleton_proxy<T>& proxy)
122 : inherited(comm, proxy) { }
126 #if BOOST_MPI_VERSION >= 3
128 class request::probe_handler
129 : public request::handler,
133 template<typename I1>
134 probe_handler(communicator const& comm, int source, int tag, I1& i1)
139 // no variadic template for now
140 template<typename I1, typename I2>
141 probe_handler(communicator const& comm, int source, int tag, I1& i1, I2& i2)
142 : Data(comm, i1, i2),
148 bool active() const { return m_source != MPI_PROC_NULL; }
149 optional<MPI_Request&> trivial() { return boost::none; }
150 void cancel() { m_source = MPI_PROC_NULL; }
155 BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (m_source,m_tag,m_comm,&msg,&stat.m_status));
156 return unpack(msg, stat);
159 optional<status> test() {
163 BOOST_MPI_CHECK_RESULT(MPI_Improbe, (m_source,m_tag,m_comm,&flag,&msg,&stat.m_status));
165 return unpack(msg, stat);
167 return optional<status>();
172 friend class request;
174 status unpack(MPI_Message& msg, status& stat) {
176 MPI_Datatype datatype = this->Data::datatype();
177 BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status, datatype, &count));
178 this->Data::resize(count);
179 BOOST_MPI_CHECK_RESULT(MPI_Mrecv, (this->Data::buffer(), count, datatype, &msg, &stat.m_status));
180 this->Data::deserialize();
181 m_source = MPI_PROC_NULL;
186 communicator const& m_comm;
190 #endif // BOOST_MPI_VERSION >= 3
194 struct dynamic_primitive_array_data {
195 dynamic_primitive_array_data(communicator const&, A& arr) : m_buffer(arr) {}
197 void* buffer() { return m_buffer.data(); }
198 void resize(std::size_t sz) { m_buffer.resize(sz); }
199 void deserialize() {}
200 MPI_Datatype datatype() { return get_mpi_datatype<typename A::value_type>(); }
206 struct serialized_data {
207 serialized_data(communicator const& comm, T& value) : m_archive(comm), m_value(value) {}
209 void* buffer() { return m_archive.address(); }
210 void resize(std::size_t sz) { m_archive.resize(sz); }
211 void deserialize() { m_archive >> m_value; }
212 MPI_Datatype datatype() { return MPI_PACKED; }
214 packed_iarchive m_archive;
219 struct serialized_data<packed_iarchive> {
220 serialized_data(communicator const& comm, packed_iarchive& ar) : m_archive(ar) {}
222 void* buffer() { return m_archive.address(); }
223 void resize(std::size_t sz) { m_archive.resize(sz); }
224 void deserialize() {}
225 MPI_Datatype datatype() { return MPI_PACKED; }
227 packed_iarchive& m_archive;
231 struct serialized_data<const skeleton_proxy<T> > {
232 serialized_data(communicator const& comm, skeleton_proxy<T> skel)
236 void* buffer() { return m_archive.get_skeleton().address(); }
237 void resize(std::size_t sz) { m_archive.get_skeleton().resize(sz); }
238 void deserialize() { m_archive >> m_proxy.object; }
239 MPI_Datatype datatype() { return MPI_PACKED; }
241 skeleton_proxy<T> m_proxy;
242 packed_skeleton_iarchive m_archive;
246 struct serialized_data<skeleton_proxy<T> >
247 : public serialized_data<const skeleton_proxy<T> > {
248 typedef serialized_data<const skeleton_proxy<T> > super;
249 serialized_data(communicator const& comm, skeleton_proxy<T> skel)
250 : super(comm, skel) {}
254 struct serialized_array_data {
255 serialized_array_data(communicator const& comm, T* values, int nb)
256 : m_archive(comm), m_values(values), m_nb(nb) {}
258 void* buffer() { return m_archive.address(); }
259 void resize(std::size_t sz) { m_archive.resize(sz); }
261 T* end = m_values + m_nb;
267 MPI_Datatype datatype() { return MPI_PACKED; }
269 packed_iarchive m_archive;
276 class BOOST_MPI_DECL request::legacy_handler : public request::handler {
278 legacy_handler(communicator const& comm, int source, int tag);
281 for (int i = 0; i < 2; ++i) {
282 if (m_requests[i] != MPI_REQUEST_NULL) {
283 BOOST_MPI_CHECK_RESULT(MPI_Cancel, (m_requests+i));
289 optional<MPI_Request&> trivial();
291 MPI_Request m_requests[2];
298 class request::legacy_serialized_handler
299 : public request::legacy_handler,
300 protected detail::serialized_irecv_data<T> {
302 typedef detail::serialized_irecv_data<T> extra;
303 legacy_serialized_handler(communicator const& comm, int source, int tag, T& value)
304 : legacy_handler(comm, source, tag),
306 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
307 (&this->extra::m_count, 1,
308 get_mpi_datatype(this->extra::m_count),
309 source, tag, comm, m_requests+0));
315 if (m_requests[1] == MPI_REQUEST_NULL) {
316 // Wait for the count message to complete
317 BOOST_MPI_CHECK_RESULT(MPI_Wait,
318 (m_requests, &stat.m_status));
319 // Resize our buffer and get ready to receive its data
320 this->extra::m_ia.resize(this->extra::m_count);
321 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
322 (this->extra::m_ia.address(), this->extra::m_ia.size(), MPI_PACKED,
323 stat.source(), stat.tag(),
324 MPI_Comm(m_comm), m_requests + 1));
327 // Wait until we have received the entire message
328 BOOST_MPI_CHECK_RESULT(MPI_Wait,
329 (m_requests + 1, &stat.m_status));
331 this->deserialize(stat);
335 optional<status> test() {
339 if (m_requests[1] == MPI_REQUEST_NULL) {
340 // Check if the count message has completed
341 BOOST_MPI_CHECK_RESULT(MPI_Test,
342 (m_requests, &flag, &stat.m_status));
344 // Resize our buffer and get ready to receive its data
345 this->extra::m_ia.resize(this->extra::m_count);
346 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
347 (this->extra::m_ia.address(), this->extra::m_ia.size(),MPI_PACKED,
348 stat.source(), stat.tag(),
349 MPI_Comm(m_comm), m_requests + 1));
351 return optional<status>(); // We have not finished yet
354 // Check if we have received the message data
355 BOOST_MPI_CHECK_RESULT(MPI_Test,
356 (m_requests + 1, &flag, &stat.m_status));
358 this->deserialize(stat);
361 return optional<status>();
366 class request::legacy_serialized_array_handler
367 : public request::legacy_handler,
368 protected detail::serialized_array_irecv_data<T> {
369 typedef detail::serialized_array_irecv_data<T> extra;
372 legacy_serialized_array_handler(communicator const& comm, int source, int tag, T* values, int n)
373 : legacy_handler(comm, source, tag),
374 extra(comm, values, n) {
375 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
376 (&this->extra::m_count, 1,
377 get_mpi_datatype(this->extra::m_count),
378 source, tag, comm, m_requests+0));
383 if (m_requests[1] == MPI_REQUEST_NULL) {
384 // Wait for the count message to complete
385 BOOST_MPI_CHECK_RESULT(MPI_Wait,
386 (m_requests, &stat.m_status));
387 // Resize our buffer and get ready to receive its data
388 this->extra::m_ia.resize(this->extra::m_count);
389 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
390 (this->extra::m_ia.address(), this->extra::m_ia.size(), MPI_PACKED,
391 stat.source(), stat.tag(),
392 MPI_Comm(m_comm), m_requests + 1));
395 // Wait until we have received the entire message
396 BOOST_MPI_CHECK_RESULT(MPI_Wait,
397 (m_requests + 1, &stat.m_status));
399 this->deserialize(stat);
403 optional<status> test() {
407 if (m_requests[1] == MPI_REQUEST_NULL) {
408 // Check if the count message has completed
409 BOOST_MPI_CHECK_RESULT(MPI_Test,
410 (m_requests, &flag, &stat.m_status));
412 // Resize our buffer and get ready to receive its data
413 this->extra::m_ia.resize(this->extra::m_count);
414 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
415 (this->extra::m_ia.address(), this->extra::m_ia.size(),MPI_PACKED,
416 stat.source(), stat.tag(),
417 MPI_Comm(m_comm), m_requests + 1));
419 return optional<status>(); // We have not finished yet
422 // Check if we have received the message data
423 BOOST_MPI_CHECK_RESULT(MPI_Test,
424 (m_requests + 1, &flag, &stat.m_status));
426 this->deserialize(stat);
429 return optional<status>();
433 template<typename T, class A>
434 class request::legacy_dynamic_primitive_array_handler
435 : public request::legacy_handler,
436 protected detail::dynamic_array_irecv_data<T,A>
438 typedef detail::dynamic_array_irecv_data<T,A> extra;
441 legacy_dynamic_primitive_array_handler(communicator const& comm, int source, int tag, std::vector<T,A>& values)
442 : legacy_handler(comm, source, tag),
444 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
445 (&this->extra::m_count, 1,
446 get_mpi_datatype(this->extra::m_count),
447 source, tag, comm, m_requests+0));
452 if (m_requests[1] == MPI_REQUEST_NULL) {
453 // Wait for the count message to complete
454 BOOST_MPI_CHECK_RESULT(MPI_Wait,
455 (m_requests, &stat.m_status));
456 // Resize our buffer and get ready to receive its data
457 this->extra::m_values.resize(this->extra::m_count);
458 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
459 (&(this->extra::m_values[0]), this->extra::m_values.size(), get_mpi_datatype<T>(),
460 stat.source(), stat.tag(),
461 MPI_Comm(m_comm), m_requests + 1));
463 // Wait until we have received the entire message
464 BOOST_MPI_CHECK_RESULT(MPI_Wait,
465 (m_requests + 1, &stat.m_status));
469 optional<status> test() {
473 if (m_requests[1] == MPI_REQUEST_NULL) {
474 // Check if the count message has completed
475 BOOST_MPI_CHECK_RESULT(MPI_Test,
476 (m_requests, &flag, &stat.m_status));
478 // Resize our buffer and get ready to receive its data
479 this->extra::m_values.resize(this->extra::m_count);
480 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
481 (&(this->extra::m_values[0]), this->extra::m_values.size(), get_mpi_datatype<T>(),
482 stat.source(), stat.tag(),
483 MPI_Comm(m_comm), m_requests + 1));
485 return optional<status>(); // We have not finished yet
488 // Check if we have received the message data
489 BOOST_MPI_CHECK_RESULT(MPI_Test,
490 (m_requests + 1, &flag, &stat.m_status));
494 return optional<status>();
498 class BOOST_MPI_DECL request::trivial_handler : public request::handler {
504 optional<status> test();
508 optional<MPI_Request&> trivial();
511 friend class request;
512 MPI_Request m_request;
515 class request::dynamic_handler : public request::handler {
519 optional<status> test();
523 optional<MPI_Request&> trivial();
526 friend class request;
527 MPI_Request m_requests[2];
531 request request::make_serialized(communicator const& comm, int source, int tag, T& value) {
532 #if defined(BOOST_MPI_USE_IMPROBE)
533 return request(new probe_handler<detail::serialized_data<T> >(comm, source, tag, value));
535 return request(new legacy_serialized_handler<T>(comm, source, tag, value));
540 request request::make_serialized_array(communicator const& comm, int source, int tag, T* values, int n) {
541 #if defined(BOOST_MPI_USE_IMPROBE)
542 return request(new probe_handler<detail::serialized_array_data<T> >(comm, source, tag, values, n));
544 return request(new legacy_serialized_array_handler<T>(comm, source, tag, values, n));
548 template<typename T, class A>
549 request request::make_dynamic_primitive_array_recv(communicator const& comm, int source, int tag,
550 std::vector<T,A>& values) {
551 #if defined(BOOST_MPI_USE_IMPROBE)
552 return request(new probe_handler<detail::dynamic_primitive_array_data<std::vector<T,A> > >(comm,source,tag,values));
554 return request(new legacy_dynamic_primitive_array_handler<T,A>(comm, source, tag, values));
560 request::make_trivial_send(communicator const& comm, int dest, int tag, T const* values, int n) {
561 trivial_handler* handler = new trivial_handler;
562 BOOST_MPI_CHECK_RESULT(MPI_Isend,
563 (const_cast<T*>(values), n,
564 get_mpi_datatype<T>(),
565 dest, tag, comm, &handler->m_request));
566 return request(handler);
571 request::make_trivial_send(communicator const& comm, int dest, int tag, T const& value) {
572 return make_trivial_send(comm, dest, tag, &value, 1);
577 request::make_trivial_recv(communicator const& comm, int dest, int tag, T* values, int n) {
578 trivial_handler* handler = new trivial_handler;
579 BOOST_MPI_CHECK_RESULT(MPI_Irecv,
581 get_mpi_datatype<T>(),
582 dest, tag, comm, &handler->m_request));
583 return request(handler);
588 request::make_trivial_recv(communicator const& comm, int dest, int tag, T& value) {
589 return make_trivial_recv(comm, dest, tag, &value, 1);
592 template<typename T, class A>
593 request request::make_dynamic_primitive_array_send(communicator const& comm, int dest, int tag,
594 std::vector<T,A> const& values) {
595 #if defined(BOOST_MPI_USE_IMPROBE)
596 return make_trivial_send(comm, dest, tag, values.data(), values.size());
599 // non blocking recv by legacy_dynamic_primitive_array_handler
600 // blocking recv by status recv_vector(source,tag,value,primitive)
601 boost::shared_ptr<std::size_t> size(new std::size_t(values.size()));
602 dynamic_handler* handler = new dynamic_handler;
603 request req(handler);
606 BOOST_MPI_CHECK_RESULT(MPI_Isend,
608 get_mpi_datatype(*size),
609 dest, tag, comm, handler->m_requests+0));
610 BOOST_MPI_CHECK_RESULT(MPI_Isend,
611 (const_cast<T*>(values.data()), *size,
612 get_mpi_datatype<T>(),
613 dest, tag, comm, handler->m_requests+1));
620 request::legacy_handler::legacy_handler(communicator const& comm, int source, int tag)
625 m_requests[0] = MPI_REQUEST_NULL;
626 m_requests[1] = MPI_REQUEST_NULL;
631 #endif // BOOST_MPI_REQUEST_HANDLERS_HPP