2 Copyright (c) 2005-2019 Intel Corporation
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
17 #ifndef __TBB__flow_graph_cache_impl_H
18 #define __TBB__flow_graph_cache_impl_H
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
24 // included in namespace tbb::flow::interfaceX (in flow_graph.h)
28 //! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
29 template< typename T, typename M=spin_mutex >
33 typedef size_t size_type;
36 typename mutex_type::scoped_lock lock( my_mutex );
37 return internal_empty();
41 typename mutex_type::scoped_lock lock( my_mutex );
46 typename mutex_type::scoped_lock lock( my_mutex );
47 for ( size_t i = internal_size(); i != 0; --i ) {
48 T &s = internal_pop();
49 if ( &s == &n ) return; // only remove one predecessor per request
55 while( !my_q.empty()) (void)my_q.pop();
56 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
57 my_built_predecessors.clear();
61 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
62 typedef edge_container<T> built_predecessors_type;
63 built_predecessors_type &built_predecessors() { return my_built_predecessors; }
65 typedef typename edge_container<T>::edge_list_type predecessor_list_type;
66 void internal_add_built_predecessor( T &n ) {
67 typename mutex_type::scoped_lock lock( my_mutex );
68 my_built_predecessors.add_edge(n);
71 void internal_delete_built_predecessor( T &n ) {
72 typename mutex_type::scoped_lock lock( my_mutex );
73 my_built_predecessors.delete_edge(n);
76 void copy_predecessors( predecessor_list_type &v) {
77 typename mutex_type::scoped_lock lock( my_mutex );
78 my_built_predecessors.copy_edges(v);
81 size_t predecessor_count() {
82 typename mutex_type::scoped_lock lock(my_mutex);
83 return (size_t)(my_built_predecessors.edge_count());
85 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
91 std::queue< T * > my_q;
92 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
93 built_predecessors_type my_built_predecessors;
96 // Assumes lock is held
97 inline bool internal_empty( ) {
101 // Assumes lock is held
102 inline size_type internal_size( ) {
106 // Assumes lock is held
107 inline void internal_push( T &n ) {
111 // Assumes lock is held
112 inline T &internal_pop() {
120 //! A cache of predecessors that only supports try_get
121 template< typename T, typename M=spin_mutex >
122 #if __TBB_PREVIEW_ASYNC_MSG
123 // TODO: make predecessor_cache type T-independent when async_msg becomes regular feature
124 class predecessor_cache : public node_cache< untyped_sender, M > {
126 class predecessor_cache : public node_cache< sender<T>, M > {
127 #endif // __TBB_PREVIEW_ASYNC_MSG
129 typedef M mutex_type;
130 typedef T output_type;
131 #if __TBB_PREVIEW_ASYNC_MSG
132 typedef untyped_sender predecessor_type;
133 typedef untyped_receiver successor_type;
135 typedef sender<output_type> predecessor_type;
136 typedef receiver<output_type> successor_type;
137 #endif // __TBB_PREVIEW_ASYNC_MSG
139 predecessor_cache( ) : my_owner( NULL ) { }
141 void set_owner( successor_type *owner ) { my_owner = owner; }
143 bool get_item( output_type &v ) {
148 predecessor_type *src;
150 typename mutex_type::scoped_lock lock(this->my_mutex);
151 if ( this->internal_empty() ) {
154 src = &this->internal_pop();
157 // Try to get from this sender
158 msg = src->try_get( v );
161 // Relinquish ownership of the edge
163 src->register_successor( *my_owner );
165 // Retain ownership of the edge
168 } while ( msg == false );
172 // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
176 predecessor_type *src;
178 if (this->internal_empty()) break;
179 src = &this->internal_pop();
181 src->register_successor( *my_owner );
188 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
189 using node_cache< predecessor_type, M >::my_built_predecessors;
191 successor_type *my_owner;
194 //! An cache of predecessors that supports requests and reservations
195 // TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature
196 template< typename T, typename M=spin_mutex >
197 class reservable_predecessor_cache : public predecessor_cache< T, M > {
199 typedef M mutex_type;
200 typedef T output_type;
201 #if __TBB_PREVIEW_ASYNC_MSG
202 typedef untyped_sender predecessor_type;
203 typedef untyped_receiver successor_type;
205 typedef sender<T> predecessor_type;
206 typedef receiver<T> successor_type;
207 #endif // __TBB_PREVIEW_ASYNC_MSG
209 reservable_predecessor_cache( ) : reserved_src(NULL) { }
212 try_reserve( output_type &v ) {
217 typename mutex_type::scoped_lock lock(this->my_mutex);
218 if ( reserved_src || this->internal_empty() )
221 reserved_src = &this->internal_pop();
224 // Try to get from this sender
225 msg = reserved_src->try_reserve( v );
228 typename mutex_type::scoped_lock lock(this->my_mutex);
229 // Relinquish ownership of the edge
230 reserved_src->register_successor( *this->my_owner );
233 // Retain ownership of the edge
234 this->add( *reserved_src );
236 } while ( msg == false );
243 reserved_src->try_release( );
250 reserved_src->try_consume( );
257 predecessor_cache<T,M>::reset( );
262 predecessor_cache<T,M>::clear();
266 predecessor_type *reserved_src;
270 //! An abstract cache of successors
271 // TODO: make successor_cache type T-independent when async_msg becomes regular feature
272 template<typename T, typename M=spin_rw_mutex >
273 class successor_cache : tbb::internal::no_copy {
276 typedef M mutex_type;
279 #if __TBB_PREVIEW_ASYNC_MSG
280 typedef untyped_receiver successor_type;
281 typedef untyped_receiver *pointer_type;
282 typedef untyped_sender owner_type;
284 typedef receiver<T> successor_type;
285 typedef receiver<T> *pointer_type;
286 typedef sender<T> owner_type;
287 #endif // __TBB_PREVIEW_ASYNC_MSG
288 typedef std::list< pointer_type > successors_type;
289 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
290 edge_container<successor_type> my_built_successors;
292 successors_type my_successors;
294 owner_type *my_owner;
297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
298 typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
300 edge_container<successor_type> &built_successors() { return my_built_successors; }
302 void internal_add_built_successor( successor_type &r) {
303 typename mutex_type::scoped_lock l(my_mutex, true);
304 my_built_successors.add_edge( r );
307 void internal_delete_built_successor( successor_type &r) {
308 typename mutex_type::scoped_lock l(my_mutex, true);
309 my_built_successors.delete_edge(r);
312 void copy_successors( successor_list_type &v) {
313 typename mutex_type::scoped_lock l(my_mutex, false);
314 my_built_successors.copy_edges(v);
317 size_t successor_count() {
318 typename mutex_type::scoped_lock l(my_mutex,false);
319 return my_built_successors.edge_count();
322 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
324 successor_cache( ) : my_owner(NULL) {}
326 void set_owner( owner_type *owner ) { my_owner = owner; }
328 virtual ~successor_cache() {}
330 void register_successor( successor_type &r ) {
331 typename mutex_type::scoped_lock l(my_mutex, true);
332 my_successors.push_back( &r );
335 void remove_successor( successor_type &r ) {
336 typename mutex_type::scoped_lock l(my_mutex, true);
337 for ( typename successors_type::iterator i = my_successors.begin();
338 i != my_successors.end(); ++i ) {
340 my_successors.erase(i);
347 typename mutex_type::scoped_lock l(my_mutex, false);
348 return my_successors.empty();
352 my_successors.clear();
353 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
354 my_built_successors.clear();
358 #if !__TBB_PREVIEW_ASYNC_MSG
359 virtual task * try_put_task( const T &t ) = 0;
360 #endif // __TBB_PREVIEW_ASYNC_MSG
361 }; // successor_cache<T>
363 //! An abstract cache of successors, specialized to continue_msg
365 class successor_cache< continue_msg > : tbb::internal::no_copy {
368 typedef spin_rw_mutex mutex_type;
371 #if __TBB_PREVIEW_ASYNC_MSG
372 typedef untyped_receiver successor_type;
373 typedef untyped_receiver *pointer_type;
375 typedef receiver<continue_msg> successor_type;
376 typedef receiver<continue_msg> *pointer_type;
377 #endif // __TBB_PREVIEW_ASYNC_MSG
378 typedef std::list< pointer_type > successors_type;
379 successors_type my_successors;
380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
381 edge_container<successor_type> my_built_successors;
382 typedef edge_container<successor_type>::edge_list_type successor_list_type;
385 sender<continue_msg> *my_owner;
389 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
391 edge_container<successor_type> &built_successors() { return my_built_successors; }
393 void internal_add_built_successor( successor_type &r) {
394 mutex_type::scoped_lock l(my_mutex, true);
395 my_built_successors.add_edge( r );
398 void internal_delete_built_successor( successor_type &r) {
399 mutex_type::scoped_lock l(my_mutex, true);
400 my_built_successors.delete_edge(r);
403 void copy_successors( successor_list_type &v) {
404 mutex_type::scoped_lock l(my_mutex, false);
405 my_built_successors.copy_edges(v);
408 size_t successor_count() {
409 mutex_type::scoped_lock l(my_mutex,false);
410 return my_built_successors.edge_count();
413 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
415 successor_cache( ) : my_owner(NULL) {}
417 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
419 virtual ~successor_cache() {}
421 void register_successor( successor_type &r ) {
422 mutex_type::scoped_lock l(my_mutex, true);
423 my_successors.push_back( &r );
424 if ( my_owner && r.is_continue_receiver() ) {
425 r.register_predecessor( *my_owner );
429 void remove_successor( successor_type &r ) {
430 mutex_type::scoped_lock l(my_mutex, true);
431 for ( successors_type::iterator i = my_successors.begin();
432 i != my_successors.end(); ++i ) {
434 // TODO: Check if we need to test for continue_receiver before
437 r.remove_predecessor( *my_owner );
438 my_successors.erase(i);
445 mutex_type::scoped_lock l(my_mutex, false);
446 return my_successors.empty();
450 my_successors.clear();
451 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
452 my_built_successors.clear();
456 #if !__TBB_PREVIEW_ASYNC_MSG
457 virtual task * try_put_task( const continue_msg &t ) = 0;
458 #endif // __TBB_PREVIEW_ASYNC_MSG
460 }; // successor_cache< continue_msg >
462 //! A cache of successors that are broadcast to
463 // TODO: make broadcast_cache type T-independent when async_msg becomes regular feature
464 template<typename T, typename M=spin_rw_mutex>
465 class broadcast_cache : public successor_cache<T, M> {
466 typedef M mutex_type;
467 typedef typename successor_cache<T,M>::successors_type successors_type;
471 broadcast_cache( ) {}
473 // as above, but call try_put_task instead, and return the last task we received (if any)
474 #if __TBB_PREVIEW_ASYNC_MSG
476 task * try_put_task( const X &t ) {
478 task * try_put_task( const T &t ) __TBB_override {
479 #endif // __TBB_PREVIEW_ASYNC_MSG
480 task * last_task = NULL;
481 bool upgraded = true;
482 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
483 typename successors_type::iterator i = this->my_successors.begin();
484 while ( i != this->my_successors.end() ) {
485 task *new_task = (*i)->try_put_task(t);
486 // workaround for icc bug
487 graph& graph_ref = (*i)->graph_reference();
488 last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
493 if ( (*i)->register_predecessor(*this->my_owner) ) {
495 l.upgrade_to_writer();
498 i = this->my_successors.erase(i);
507 // call try_put_task and return list of received tasks
508 #if __TBB_PREVIEW_ASYNC_MSG
510 bool gather_successful_try_puts( const X &t, task_list &tasks ) {
512 bool gather_successful_try_puts( const T &t, task_list &tasks ) {
513 #endif // __TBB_PREVIEW_ASYNC_MSG
514 bool upgraded = true;
515 bool is_at_least_one_put_successful = false;
516 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
517 typename successors_type::iterator i = this->my_successors.begin();
518 while ( i != this->my_successors.end() ) {
519 task * new_task = (*i)->try_put_task(t);
522 if(new_task != SUCCESSFULLY_ENQUEUED) {
523 tasks.push_back(*new_task);
525 is_at_least_one_put_successful = true;
528 if ( (*i)->register_predecessor(*this->my_owner) ) {
530 l.upgrade_to_writer();
533 i = this->my_successors.erase(i);
539 return is_at_least_one_put_successful;
543 //! A cache of successors that are put in a round-robin fashion
544 // TODO: make round_robin_cache type T-independent when async_msg becomes regular feature
545 template<typename T, typename M=spin_rw_mutex >
546 class round_robin_cache : public successor_cache<T, M> {
547 typedef size_t size_type;
548 typedef M mutex_type;
549 typedef typename successor_cache<T,M>::successors_type successors_type;
553 round_robin_cache( ) {}
556 typename mutex_type::scoped_lock l(this->my_mutex, false);
557 return this->my_successors.size();
560 #if __TBB_PREVIEW_ASYNC_MSG
562 task * try_put_task( const X &t ) {
564 task *try_put_task( const T &t ) __TBB_override {
565 #endif // __TBB_PREVIEW_ASYNC_MSG
566 bool upgraded = true;
567 typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
568 typename successors_type::iterator i = this->my_successors.begin();
569 while ( i != this->my_successors.end() ) {
570 task *new_task = (*i)->try_put_task(t);
574 if ( (*i)->register_predecessor(*this->my_owner) ) {
576 l.upgrade_to_writer();
579 i = this->my_successors.erase(i);
590 } // namespace internal
592 #endif // __TBB__flow_graph_cache_impl_H