Committing TBB 2019 Update 9 source code
[platform/upstream/tbb.git] / include / tbb / internal / _flow_graph_cache_impl.h
1 /*
2     Copyright (c) 2005-2019 Intel Corporation
3
4     Licensed under the Apache License, Version 2.0 (the "License");
5     you may not use this file except in compliance with the License.
6     You may obtain a copy of the License at
7
8         http://www.apache.org/licenses/LICENSE-2.0
9
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15 */
16
17 #ifndef __TBB__flow_graph_cache_impl_H
18 #define __TBB__flow_graph_cache_impl_H
19
20 #ifndef __TBB_flow_graph_H
21 #error Do not #include this internal file directly; use public TBB headers instead.
22 #endif
23
24 // included in namespace tbb::flow::interfaceX (in flow_graph.h)
25
26 namespace internal {
27
28 //! A node_cache maintains a std::queue of elements of type T.  Each operation is protected by a lock.
29 template< typename T, typename M=spin_mutex >
30 class node_cache {
31     public:
32
33     typedef size_t size_type;
34
35     bool empty() {
36         typename mutex_type::scoped_lock lock( my_mutex );
37         return internal_empty();
38     }
39
40     void add( T &n ) {
41         typename mutex_type::scoped_lock lock( my_mutex );
42         internal_push(n);
43     }
44
45     void remove( T &n ) {
46         typename mutex_type::scoped_lock lock( my_mutex );
47         for ( size_t i = internal_size(); i != 0; --i ) {
48             T &s = internal_pop();
49             if ( &s == &n )  return;  // only remove one predecessor per request
50             internal_push(s);
51         }
52     }
53
54     void clear() {
55         while( !my_q.empty()) (void)my_q.pop();
56 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
57         my_built_predecessors.clear();
58 #endif
59     }
60
61 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
62     typedef edge_container<T> built_predecessors_type;
63     built_predecessors_type &built_predecessors() { return my_built_predecessors; }
64
65     typedef typename edge_container<T>::edge_list_type predecessor_list_type;
66     void internal_add_built_predecessor( T &n ) {
67         typename mutex_type::scoped_lock lock( my_mutex );
68         my_built_predecessors.add_edge(n);
69     }
70
71     void internal_delete_built_predecessor( T &n ) {
72         typename mutex_type::scoped_lock lock( my_mutex );
73         my_built_predecessors.delete_edge(n);
74     }
75
76     void copy_predecessors( predecessor_list_type &v) {
77         typename mutex_type::scoped_lock lock( my_mutex );
78         my_built_predecessors.copy_edges(v);
79     }
80
81     size_t predecessor_count() {
82         typename mutex_type::scoped_lock lock(my_mutex);
83         return (size_t)(my_built_predecessors.edge_count());
84     }
85 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
86
87 protected:
88
89     typedef M mutex_type;
90     mutex_type my_mutex;
91     std::queue< T * > my_q;
92 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
93     built_predecessors_type my_built_predecessors;
94 #endif
95
96     // Assumes lock is held
97     inline bool internal_empty( )  {
98         return my_q.empty();
99     }
100
101     // Assumes lock is held
102     inline size_type internal_size( )  {
103         return my_q.size();
104     }
105
106     // Assumes lock is held
107     inline void internal_push( T &n )  {
108         my_q.push(&n);
109     }
110
111     // Assumes lock is held
112     inline T &internal_pop() {
113         T *v = my_q.front();
114         my_q.pop();
115         return *v;
116     }
117
118 };
119
120 //! A cache of predecessors that only supports try_get
121 template< typename T, typename M=spin_mutex >
122 #if __TBB_PREVIEW_ASYNC_MSG
123 // TODO: make predecessor_cache type T-independent when async_msg becomes regular feature
124 class predecessor_cache : public node_cache< untyped_sender, M > {
125 #else
126 class predecessor_cache : public node_cache< sender<T>, M > {
127 #endif // __TBB_PREVIEW_ASYNC_MSG
128 public:
129     typedef M mutex_type;
130     typedef T output_type;
131 #if __TBB_PREVIEW_ASYNC_MSG
132     typedef untyped_sender predecessor_type;
133     typedef untyped_receiver successor_type;
134 #else
135     typedef sender<output_type> predecessor_type;
136     typedef receiver<output_type> successor_type;
137 #endif // __TBB_PREVIEW_ASYNC_MSG
138
139     predecessor_cache( ) : my_owner( NULL ) { }
140
141     void set_owner( successor_type *owner ) { my_owner = owner; }
142
143     bool get_item( output_type &v ) {
144
145         bool msg = false;
146
147         do {
148             predecessor_type *src;
149             {
150                 typename mutex_type::scoped_lock lock(this->my_mutex);
151                 if ( this->internal_empty() ) {
152                     break;
153                 }
154                 src = &this->internal_pop();
155             }
156
157             // Try to get from this sender
158             msg = src->try_get( v );
159
160             if (msg == false) {
161                 // Relinquish ownership of the edge
162                 if (my_owner)
163                     src->register_successor( *my_owner );
164             } else {
165                 // Retain ownership of the edge
166                 this->add(*src);
167             }
168         } while ( msg == false );
169         return msg;
170     }
171
172     // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
173     void reset() {
174         if (my_owner) {
175             for(;;) {
176                 predecessor_type *src;
177                 {
178                     if (this->internal_empty()) break;
179                     src = &this->internal_pop();
180                 }
181                 src->register_successor( *my_owner );
182             }
183         }
184     }
185
186 protected:
187
188 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
189     using node_cache< predecessor_type, M >::my_built_predecessors;
190 #endif
191     successor_type *my_owner;
192 };
193
194 //! An cache of predecessors that supports requests and reservations
195 // TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature
196 template< typename T, typename M=spin_mutex >
197 class reservable_predecessor_cache : public predecessor_cache< T, M > {
198 public:
199     typedef M mutex_type;
200     typedef T output_type;
201 #if __TBB_PREVIEW_ASYNC_MSG
202     typedef untyped_sender predecessor_type;
203     typedef untyped_receiver successor_type;
204 #else
205     typedef sender<T> predecessor_type;
206     typedef receiver<T> successor_type;
207 #endif // __TBB_PREVIEW_ASYNC_MSG
208
209     reservable_predecessor_cache( ) : reserved_src(NULL) { }
210
211     bool
212     try_reserve( output_type &v ) {
213         bool msg = false;
214
215         do {
216             {
217                 typename mutex_type::scoped_lock lock(this->my_mutex);
218                 if ( reserved_src || this->internal_empty() )
219                     return false;
220
221                 reserved_src = &this->internal_pop();
222             }
223
224             // Try to get from this sender
225             msg = reserved_src->try_reserve( v );
226
227             if (msg == false) {
228                 typename mutex_type::scoped_lock lock(this->my_mutex);
229                 // Relinquish ownership of the edge
230                 reserved_src->register_successor( *this->my_owner );
231                 reserved_src = NULL;
232             } else {
233                 // Retain ownership of the edge
234                 this->add( *reserved_src );
235             }
236         } while ( msg == false );
237
238         return msg;
239     }
240
241     bool
242     try_release( ) {
243         reserved_src->try_release( );
244         reserved_src = NULL;
245         return true;
246     }
247
248     bool
249     try_consume( ) {
250         reserved_src->try_consume( );
251         reserved_src = NULL;
252         return true;
253     }
254
255     void reset( ) {
256         reserved_src = NULL;
257         predecessor_cache<T,M>::reset( );
258     }
259
260     void clear() {
261         reserved_src = NULL;
262         predecessor_cache<T,M>::clear();
263     }
264
265 private:
266     predecessor_type *reserved_src;
267 };
268
269
270 //! An abstract cache of successors
271 // TODO: make successor_cache type T-independent when async_msg becomes regular feature
272 template<typename T, typename M=spin_rw_mutex >
273 class successor_cache : tbb::internal::no_copy {
274 protected:
275
276     typedef M mutex_type;
277     mutex_type my_mutex;
278
279 #if __TBB_PREVIEW_ASYNC_MSG
280     typedef untyped_receiver successor_type;
281     typedef untyped_receiver *pointer_type;
282     typedef untyped_sender owner_type;
283 #else
284     typedef receiver<T> successor_type;
285     typedef receiver<T> *pointer_type;
286     typedef sender<T> owner_type;
287 #endif // __TBB_PREVIEW_ASYNC_MSG
288     typedef std::list< pointer_type > successors_type;
289 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
290     edge_container<successor_type> my_built_successors;
291 #endif
292     successors_type my_successors;
293
294     owner_type *my_owner;
295
296 public:
297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
298     typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
299
300     edge_container<successor_type> &built_successors() { return my_built_successors; }
301
302     void internal_add_built_successor( successor_type &r) {
303         typename mutex_type::scoped_lock l(my_mutex, true);
304         my_built_successors.add_edge( r );
305     }
306
307     void internal_delete_built_successor( successor_type &r) {
308         typename mutex_type::scoped_lock l(my_mutex, true);
309         my_built_successors.delete_edge(r);
310     }
311
312     void copy_successors( successor_list_type &v) {
313         typename mutex_type::scoped_lock l(my_mutex, false);
314         my_built_successors.copy_edges(v);
315     }
316
317     size_t successor_count() {
318         typename mutex_type::scoped_lock l(my_mutex,false);
319         return my_built_successors.edge_count();
320     }
321
322 #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
323
324     successor_cache( ) : my_owner(NULL) {}
325
326     void set_owner( owner_type *owner ) { my_owner = owner; }
327
328     virtual ~successor_cache() {}
329
330     void register_successor( successor_type &r ) {
331         typename mutex_type::scoped_lock l(my_mutex, true);
332         my_successors.push_back( &r );
333     }
334
335     void remove_successor( successor_type &r ) {
336         typename mutex_type::scoped_lock l(my_mutex, true);
337         for ( typename successors_type::iterator i = my_successors.begin();
338               i != my_successors.end(); ++i ) {
339             if ( *i == & r ) {
340                 my_successors.erase(i);
341                 break;
342             }
343         }
344     }
345
346     bool empty() {
347         typename mutex_type::scoped_lock l(my_mutex, false);
348         return my_successors.empty();
349     }
350
351     void clear() {
352         my_successors.clear();
353 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
354         my_built_successors.clear();
355 #endif
356     }
357
358 #if !__TBB_PREVIEW_ASYNC_MSG
359     virtual task * try_put_task( const T &t ) = 0;
360 #endif // __TBB_PREVIEW_ASYNC_MSG
361  };  // successor_cache<T>
362
363 //! An abstract cache of successors, specialized to continue_msg
364 template<typename M>
365 class successor_cache< continue_msg, M > : tbb::internal::no_copy {
366 protected:
367
368     typedef M mutex_type;
369     mutex_type my_mutex;
370
371 #if __TBB_PREVIEW_ASYNC_MSG
372     typedef untyped_receiver successor_type;
373     typedef untyped_receiver *pointer_type;
374 #else
375     typedef receiver<continue_msg> successor_type;
376     typedef receiver<continue_msg> *pointer_type;
377 #endif // __TBB_PREVIEW_ASYNC_MSG
378     typedef std::list< pointer_type > successors_type;
379     successors_type my_successors;
380 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
381     edge_container<successor_type> my_built_successors;
382     typedef edge_container<successor_type>::edge_list_type successor_list_type;
383 #endif
384
385     sender<continue_msg> *my_owner;
386
387 public:
388
389 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
390
391     edge_container<successor_type> &built_successors() { return my_built_successors; }
392
393     void internal_add_built_successor( successor_type &r) {
394         typename mutex_type::scoped_lock l(my_mutex, true);
395         my_built_successors.add_edge( r );
396     }
397
398     void internal_delete_built_successor( successor_type &r) {
399         typename mutex_type::scoped_lock l(my_mutex, true);
400         my_built_successors.delete_edge(r);
401     }
402
403     void copy_successors( successor_list_type &v) {
404         typename mutex_type::scoped_lock l(my_mutex, false);
405         my_built_successors.copy_edges(v);
406     }
407
408     size_t successor_count() {
409         typename mutex_type::scoped_lock l(my_mutex,false);
410         return my_built_successors.edge_count();
411     }
412
413 #endif  /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
414
415     successor_cache( ) : my_owner(NULL) {}
416
417     void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
418
419     virtual ~successor_cache() {}
420
421     void register_successor( successor_type &r ) {
422         typename mutex_type::scoped_lock l(my_mutex, true);
423         my_successors.push_back( &r );
424         if ( my_owner && r.is_continue_receiver() ) {
425             r.register_predecessor( *my_owner );
426         }
427     }
428
429     void remove_successor( successor_type &r ) {
430         typename mutex_type::scoped_lock l(my_mutex, true);
431         for ( successors_type::iterator i = my_successors.begin();
432               i != my_successors.end(); ++i ) {
433             if ( *i == & r ) {
434                 // TODO: Check if we need to test for continue_receiver before
435                 // removing from r.
436                 if ( my_owner )
437                     r.remove_predecessor( *my_owner );
438                 my_successors.erase(i);
439                 break;
440             }
441         }
442     }
443
444     bool empty() {
445         typename mutex_type::scoped_lock l(my_mutex, false);
446         return my_successors.empty();
447     }
448
449     void clear() {
450         my_successors.clear();
451 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
452         my_built_successors.clear();
453 #endif
454     }
455
456 #if !__TBB_PREVIEW_ASYNC_MSG
457     virtual task * try_put_task( const continue_msg &t ) = 0;
458 #endif // __TBB_PREVIEW_ASYNC_MSG
459
460 };  // successor_cache< continue_msg >
461
462 //! A cache of successors that are broadcast to
463 // TODO: make broadcast_cache type T-independent when async_msg becomes regular feature
464 template<typename T, typename M=spin_rw_mutex>
465 class broadcast_cache : public successor_cache<T, M> {
466     typedef M mutex_type;
467     typedef typename successor_cache<T,M>::successors_type successors_type;
468
469 public:
470
471     broadcast_cache( ) {}
472
473     // as above, but call try_put_task instead, and return the last task we received (if any)
474 #if __TBB_PREVIEW_ASYNC_MSG
475     template<typename X>
476     task * try_put_task( const X &t ) {
477 #else
478     task * try_put_task( const T &t ) __TBB_override {
479 #endif // __TBB_PREVIEW_ASYNC_MSG
480         task * last_task = NULL;
481         bool upgraded = true;
482         typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
483         typename successors_type::iterator i = this->my_successors.begin();
484         while ( i != this->my_successors.end() ) {
485             task *new_task = (*i)->try_put_task(t);
486             // workaround for icc bug
487             graph& graph_ref = (*i)->graph_reference();
488             last_task = combine_tasks(graph_ref, last_task, new_task);  // enqueue if necessary
489             if(new_task) {
490                 ++i;
491             }
492             else {  // failed
493                 if ( (*i)->register_predecessor(*this->my_owner) ) {
494                     if (!upgraded) {
495                         l.upgrade_to_writer();
496                         upgraded = true;
497                     }
498                     i = this->my_successors.erase(i);
499                 } else {
500                     ++i;
501                 }
502             }
503         }
504         return last_task;
505     }
506
507     // call try_put_task and return list of received tasks
508 #if __TBB_PREVIEW_ASYNC_MSG
509     template<typename X>
510     bool gather_successful_try_puts( const X &t, task_list &tasks ) {
511 #else
512     bool gather_successful_try_puts( const T &t, task_list &tasks ) {
513 #endif // __TBB_PREVIEW_ASYNC_MSG
514         bool upgraded = true;
515         bool is_at_least_one_put_successful = false;
516         typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
517         typename successors_type::iterator i = this->my_successors.begin();
518         while ( i != this->my_successors.end() ) {
519             task * new_task = (*i)->try_put_task(t);
520             if(new_task) {
521                 ++i;
522                 if(new_task != SUCCESSFULLY_ENQUEUED) {
523                     tasks.push_back(*new_task);
524                 }
525                 is_at_least_one_put_successful = true;
526             }
527             else {  // failed
528                 if ( (*i)->register_predecessor(*this->my_owner) ) {
529                     if (!upgraded) {
530                         l.upgrade_to_writer();
531                         upgraded = true;
532                     }
533                     i = this->my_successors.erase(i);
534                 } else {
535                     ++i;
536                 }
537             }
538         }
539         return is_at_least_one_put_successful;
540     }
541 };
542
543 //! A cache of successors that are put in a round-robin fashion
544 // TODO: make round_robin_cache type T-independent when async_msg becomes regular feature
545 template<typename T, typename M=spin_rw_mutex >
546 class round_robin_cache : public successor_cache<T, M> {
547     typedef size_t size_type;
548     typedef M mutex_type;
549     typedef typename successor_cache<T,M>::successors_type successors_type;
550
551 public:
552
553     round_robin_cache( ) {}
554
555     size_type size() {
556         typename mutex_type::scoped_lock l(this->my_mutex, false);
557         return this->my_successors.size();
558     }
559
560 #if __TBB_PREVIEW_ASYNC_MSG
561     template<typename X>
562     task * try_put_task( const X &t ) {
563 #else
564     task *try_put_task( const T &t ) __TBB_override {
565 #endif // __TBB_PREVIEW_ASYNC_MSG
566         bool upgraded = true;
567         typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
568         typename successors_type::iterator i = this->my_successors.begin();
569         while ( i != this->my_successors.end() ) {
570             task *new_task = (*i)->try_put_task(t);
571             if ( new_task ) {
572                 return new_task;
573             } else {
574                if ( (*i)->register_predecessor(*this->my_owner) ) {
575                    if (!upgraded) {
576                        l.upgrade_to_writer();
577                        upgraded = true;
578                    }
579                    i = this->my_successors.erase(i);
580                }
581                else {
582                    ++i;
583                }
584             }
585         }
586         return NULL;
587     }
588 };
589
590 } // namespace internal
591
592 #endif // __TBB__flow_graph_cache_impl_H