resolve cyclic dependency with zstd
[platform/upstream/cmake.git] / Source / cmWorkerPool.cxx
1 /* Distributed under the OSI-approved BSD 3-Clause License.  See accompanying
2    file Copyright.txt or https://cmake.org/licensing for details.  */
3 #include "cmWorkerPool.h"
4
5 #include <algorithm>
6 #include <array>
7 #include <condition_variable>
8 #include <cstddef>
9 #include <deque>
10 #include <functional>
11 #include <mutex>
12 #include <thread>
13
14 #include <cm/memory>
15
16 #include <cm3p/uv.h>
17
18 #include "cmRange.h"
19 #include "cmStringAlgorithms.h"
20 #include "cmUVHandlePtr.h"
21 #include "cmUVSignalHackRAII.h" // IWYU pragma: keep
22
23 /**
24  * @brief libuv pipe buffer class
25  */
26 class cmUVPipeBuffer
27 {
28 public:
29   using DataRange = cmRange<const char*>;
30   using DataFunction = std::function<void(DataRange)>;
31   /// On error the ssize_t argument is a non zero libuv error code
32   using EndFunction = std::function<void(ssize_t)>;
33
34   /**
35    * Reset to construction state
36    */
37   void reset();
38
39   /**
40    * Initializes uv_pipe(), uv_stream() and uv_handle()
41    * @return true on success
42    */
43   bool init(uv_loop_t* uv_loop);
44
45   /**
46    * Start reading
47    * @return true on success
48    */
49   bool startRead(DataFunction dataFunction, EndFunction endFunction);
50
51   //! libuv pipe
52   uv_pipe_t* uv_pipe() const { return this->UVPipe_.get(); }
53   //! uv_pipe() casted to libuv stream
54   uv_stream_t* uv_stream() const
55   {
56     return static_cast<uv_stream_t*>(this->UVPipe_);
57   }
58   //! uv_pipe() casted to libuv handle
59   uv_handle_t* uv_handle() { return static_cast<uv_handle_t*>(this->UVPipe_); }
60
61 private:
62   // -- Libuv callbacks
63   static void UVAlloc(uv_handle_t* handle, size_t suggestedSize,
64                       uv_buf_t* buf);
65   static void UVData(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf);
66
67   cm::uv_pipe_ptr UVPipe_;
68   std::vector<char> Buffer_;
69   DataFunction DataFunction_;
70   EndFunction EndFunction_;
71 };
72
73 void cmUVPipeBuffer::reset()
74 {
75   if (this->UVPipe_.get() != nullptr) {
76     this->EndFunction_ = nullptr;
77     this->DataFunction_ = nullptr;
78     this->Buffer_.clear();
79     this->Buffer_.shrink_to_fit();
80     this->UVPipe_.reset();
81   }
82 }
83
84 bool cmUVPipeBuffer::init(uv_loop_t* uv_loop)
85 {
86   this->reset();
87   if (uv_loop == nullptr) {
88     return false;
89   }
90   int ret = this->UVPipe_.init(*uv_loop, 0, this);
91   return (ret == 0);
92 }
93
94 bool cmUVPipeBuffer::startRead(DataFunction dataFunction,
95                                EndFunction endFunction)
96 {
97   if (this->UVPipe_.get() == nullptr) {
98     return false;
99   }
100   if (!dataFunction || !endFunction) {
101     return false;
102   }
103   this->DataFunction_ = std::move(dataFunction);
104   this->EndFunction_ = std::move(endFunction);
105   int ret = uv_read_start(this->uv_stream(), &cmUVPipeBuffer::UVAlloc,
106                           &cmUVPipeBuffer::UVData);
107   return (ret == 0);
108 }
109
110 void cmUVPipeBuffer::UVAlloc(uv_handle_t* handle, size_t suggestedSize,
111                              uv_buf_t* buf)
112 {
113   auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(handle->data);
114   pipe.Buffer_.resize(suggestedSize);
115   buf->base = pipe.Buffer_.data();
116   buf->len = static_cast<unsigned long>(pipe.Buffer_.size());
117 }
118
119 void cmUVPipeBuffer::UVData(uv_stream_t* stream, ssize_t nread,
120                             const uv_buf_t* buf)
121 {
122   auto& pipe = *reinterpret_cast<cmUVPipeBuffer*>(stream->data);
123   if (nread > 0) {
124     if (buf->base != nullptr) {
125       // Call data function
126       pipe.DataFunction_(DataRange(buf->base, buf->base + nread));
127     }
128   } else if (nread < 0) {
129     // Save the end function on the stack before resetting the pipe
130     EndFunction efunc;
131     efunc.swap(pipe.EndFunction_);
132     // Reset pipe before calling the end function
133     pipe.reset();
134     // Call end function
135     efunc((nread == UV_EOF) ? 0 : nread);
136   }
137 }
138
139 /**
140  * @brief External process management class
141  */
142 class cmUVReadOnlyProcess
143 {
144 public:
145   // -- Types
146   //! @brief Process settings
147   struct SetupT
148   {
149     std::string WorkingDirectory;
150     std::vector<std::string> Command;
151     cmWorkerPool::ProcessResultT* Result = nullptr;
152     bool MergedOutput = false;
153   };
154
155   // -- Const accessors
156   SetupT const& Setup() const { return this->Setup_; }
157   cmWorkerPool::ProcessResultT* Result() const { return this->Setup_.Result; }
158   bool IsStarted() const { return this->IsStarted_; }
159   bool IsFinished() const { return this->IsFinished_; }
160
161   // -- Runtime
162   void setup(cmWorkerPool::ProcessResultT* result, bool mergedOutput,
163              std::vector<std::string> const& command,
164              std::string const& workingDirectory = std::string());
165   bool start(uv_loop_t* uv_loop, std::function<void()> finishedCallback);
166
167 private:
168   // -- Libuv callbacks
169   static void UVExit(uv_process_t* handle, int64_t exitStatus, int termSignal);
170   void UVPipeOutData(cmUVPipeBuffer::DataRange data) const;
171   void UVPipeOutEnd(ssize_t error);
172   void UVPipeErrData(cmUVPipeBuffer::DataRange data) const;
173   void UVPipeErrEnd(ssize_t error);
174   void UVTryFinish();
175
176   // -- Setup
177   SetupT Setup_;
178   // -- Runtime
179   bool IsStarted_ = false;
180   bool IsFinished_ = false;
181   std::function<void()> FinishedCallback_;
182   std::vector<const char*> CommandPtr_;
183   std::array<uv_stdio_container_t, 3> UVOptionsStdIO_;
184   uv_process_options_t UVOptions_;
185   cm::uv_process_ptr UVProcess_;
186   cmUVPipeBuffer UVPipeOut_;
187   cmUVPipeBuffer UVPipeErr_;
188 };
189
190 void cmUVReadOnlyProcess::setup(cmWorkerPool::ProcessResultT* result,
191                                 bool mergedOutput,
192                                 std::vector<std::string> const& command,
193                                 std::string const& workingDirectory)
194 {
195   this->Setup_.WorkingDirectory = workingDirectory;
196   this->Setup_.Command = command;
197   this->Setup_.Result = result;
198   this->Setup_.MergedOutput = mergedOutput;
199 }
200
201 bool cmUVReadOnlyProcess::start(uv_loop_t* uv_loop,
202                                 std::function<void()> finishedCallback)
203 {
204   if (this->IsStarted() || (this->Result() == nullptr)) {
205     return false;
206   }
207
208   // Reset result before the start
209   this->Result()->reset();
210
211   // Fill command string pointers
212   if (!this->Setup().Command.empty()) {
213     this->CommandPtr_.reserve(this->Setup().Command.size() + 1);
214     for (std::string const& arg : this->Setup().Command) {
215       this->CommandPtr_.push_back(arg.c_str());
216     }
217     this->CommandPtr_.push_back(nullptr);
218   } else {
219     this->Result()->ErrorMessage = "Empty command";
220   }
221
222   if (!this->Result()->error()) {
223     if (!this->UVPipeOut_.init(uv_loop)) {
224       this->Result()->ErrorMessage = "libuv stdout pipe initialization failed";
225     }
226   }
227   if (!this->Result()->error()) {
228     if (!this->UVPipeErr_.init(uv_loop)) {
229       this->Result()->ErrorMessage = "libuv stderr pipe initialization failed";
230     }
231   }
232   if (!this->Result()->error()) {
233     // -- Setup process stdio options
234     // stdin
235     this->UVOptionsStdIO_[0].flags = UV_IGNORE;
236     this->UVOptionsStdIO_[0].data.stream = nullptr;
237     // stdout
238     this->UVOptionsStdIO_[1].flags =
239       static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
240     this->UVOptionsStdIO_[1].data.stream = this->UVPipeOut_.uv_stream();
241     // stderr
242     this->UVOptionsStdIO_[2].flags =
243       static_cast<uv_stdio_flags>(UV_CREATE_PIPE | UV_WRITABLE_PIPE);
244     this->UVOptionsStdIO_[2].data.stream = this->UVPipeErr_.uv_stream();
245
246     // -- Setup process options
247     std::fill_n(reinterpret_cast<char*>(&this->UVOptions_),
248                 sizeof(this->UVOptions_), 0);
249     this->UVOptions_.exit_cb = &cmUVReadOnlyProcess::UVExit;
250     this->UVOptions_.file = this->CommandPtr_[0];
251     this->UVOptions_.args = const_cast<char**>(this->CommandPtr_.data());
252     this->UVOptions_.cwd = this->Setup_.WorkingDirectory.c_str();
253     this->UVOptions_.flags = UV_PROCESS_WINDOWS_HIDE;
254     this->UVOptions_.stdio_count =
255       static_cast<int>(this->UVOptionsStdIO_.size());
256     this->UVOptions_.stdio = this->UVOptionsStdIO_.data();
257
258     // -- Spawn process
259     int uvErrorCode = this->UVProcess_.spawn(*uv_loop, this->UVOptions_, this);
260     if (uvErrorCode != 0) {
261       this->Result()->ErrorMessage = "libuv process spawn failed";
262       if (const char* uvErr = uv_strerror(uvErrorCode)) {
263         this->Result()->ErrorMessage += ": ";
264         this->Result()->ErrorMessage += uvErr;
265       }
266     }
267   }
268   // -- Start reading from stdio streams
269   if (!this->Result()->error()) {
270     if (!this->UVPipeOut_.startRead(
271           [this](cmUVPipeBuffer::DataRange range) {
272             this->UVPipeOutData(range);
273           },
274           [this](ssize_t error) { this->UVPipeOutEnd(error); })) {
275       this->Result()->ErrorMessage =
276         "libuv start reading from stdout pipe failed";
277     }
278   }
279   if (!this->Result()->error()) {
280     if (!this->UVPipeErr_.startRead(
281           [this](cmUVPipeBuffer::DataRange range) {
282             this->UVPipeErrData(range);
283           },
284           [this](ssize_t error) { this->UVPipeErrEnd(error); })) {
285       this->Result()->ErrorMessage =
286         "libuv start reading from stderr pipe failed";
287     }
288   }
289
290   if (!this->Result()->error()) {
291     this->IsStarted_ = true;
292     this->FinishedCallback_ = std::move(finishedCallback);
293   } else {
294     // Clear libuv handles and finish
295     this->UVProcess_.reset();
296     this->UVPipeOut_.reset();
297     this->UVPipeErr_.reset();
298     this->CommandPtr_.clear();
299   }
300
301   return this->IsStarted();
302 }
303
304 void cmUVReadOnlyProcess::UVExit(uv_process_t* handle, int64_t exitStatus,
305                                  int termSignal)
306 {
307   auto& proc = *reinterpret_cast<cmUVReadOnlyProcess*>(handle->data);
308   if (proc.IsStarted() && !proc.IsFinished()) {
309     // Set error message on demand
310     proc.Result()->ExitStatus = exitStatus;
311     proc.Result()->TermSignal = termSignal;
312     if (!proc.Result()->error()) {
313       if (termSignal != 0) {
314         proc.Result()->ErrorMessage = cmStrCat(
315           "Process was terminated by signal ", proc.Result()->TermSignal);
316       } else if (exitStatus != 0) {
317         proc.Result()->ErrorMessage = cmStrCat(
318           "Process failed with return value ", proc.Result()->ExitStatus);
319       }
320     }
321
322     // Reset process handle
323     proc.UVProcess_.reset();
324     // Try finish
325     proc.UVTryFinish();
326   }
327 }
328
329 void cmUVReadOnlyProcess::UVPipeOutData(cmUVPipeBuffer::DataRange data) const
330 {
331   this->Result()->StdOut.append(data.begin(), data.end());
332 }
333
334 void cmUVReadOnlyProcess::UVPipeOutEnd(ssize_t error)
335 {
336   // Process pipe error
337   if ((error != 0) && !this->Result()->error()) {
338     this->Result()->ErrorMessage = cmStrCat(
339       "Reading from stdout pipe failed with libuv error code ", error);
340   }
341   // Try finish
342   this->UVTryFinish();
343 }
344
345 void cmUVReadOnlyProcess::UVPipeErrData(cmUVPipeBuffer::DataRange data) const
346 {
347   std::string* str = this->Setup_.MergedOutput ? &this->Result()->StdOut
348                                                : &this->Result()->StdErr;
349   str->append(data.begin(), data.end());
350 }
351
352 void cmUVReadOnlyProcess::UVPipeErrEnd(ssize_t error)
353 {
354   // Process pipe error
355   if ((error != 0) && !this->Result()->error()) {
356     this->Result()->ErrorMessage = cmStrCat(
357       "Reading from stderr pipe failed with libuv error code ", error);
358   }
359   // Try finish
360   this->UVTryFinish();
361 }
362
363 void cmUVReadOnlyProcess::UVTryFinish()
364 {
365   // There still might be data in the pipes after the process has finished.
366   // Therefore check if the process is finished AND all pipes are closed
367   // before signaling the worker thread to continue.
368   if ((this->UVProcess_.get() != nullptr) ||
369       (this->UVPipeOut_.uv_pipe() != nullptr) ||
370       (this->UVPipeErr_.uv_pipe() != nullptr)) {
371     return;
372   }
373   this->IsFinished_ = true;
374   this->FinishedCallback_();
375 }
376
377 /**
378  * @brief Worker pool worker thread
379  */
380 class cmWorkerPoolWorker
381 {
382 public:
383   cmWorkerPoolWorker(uv_loop_t& uvLoop);
384   ~cmWorkerPoolWorker();
385
386   cmWorkerPoolWorker(cmWorkerPoolWorker const&) = delete;
387   cmWorkerPoolWorker& operator=(cmWorkerPoolWorker const&) = delete;
388
389   /**
390    * Set the internal thread
391    */
392   void SetThread(std::thread&& aThread) { this->Thread_ = std::move(aThread); }
393
394   /**
395    * Run an external process
396    */
397   bool RunProcess(cmWorkerPool::ProcessResultT& result,
398                   std::vector<std::string> const& command,
399                   std::string const& workingDirectory);
400
401 private:
402   // -- Libuv callbacks
403   static void UVProcessStart(uv_async_t* handle);
404   void UVProcessFinished();
405
406   // -- Process management
407   struct
408   {
409     std::mutex Mutex;
410     cm::uv_async_ptr Request;
411     std::condition_variable Condition;
412     std::unique_ptr<cmUVReadOnlyProcess> ROP;
413   } Proc_;
414   // -- System thread
415   std::thread Thread_;
416 };
417
418 cmWorkerPoolWorker::cmWorkerPoolWorker(uv_loop_t& uvLoop)
419 {
420   this->Proc_.Request.init(uvLoop, &cmWorkerPoolWorker::UVProcessStart, this);
421 }
422
423 cmWorkerPoolWorker::~cmWorkerPoolWorker()
424 {
425   if (this->Thread_.joinable()) {
426     this->Thread_.join();
427   }
428 }
429
430 bool cmWorkerPoolWorker::RunProcess(cmWorkerPool::ProcessResultT& result,
431                                     std::vector<std::string> const& command,
432                                     std::string const& workingDirectory)
433 {
434   if (command.empty()) {
435     return false;
436   }
437   // Create process instance
438   {
439     std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
440     this->Proc_.ROP = cm::make_unique<cmUVReadOnlyProcess>();
441     this->Proc_.ROP->setup(&result, true, command, workingDirectory);
442   }
443   // Send asynchronous process start request to libuv loop
444   this->Proc_.Request.send();
445   // Wait until the process has been finished and destroyed
446   {
447     std::unique_lock<std::mutex> ulock(this->Proc_.Mutex);
448     while (this->Proc_.ROP) {
449       this->Proc_.Condition.wait(ulock);
450     }
451   }
452   return !result.error();
453 }
454
455 void cmWorkerPoolWorker::UVProcessStart(uv_async_t* handle)
456 {
457   auto* wrk = reinterpret_cast<cmWorkerPoolWorker*>(handle->data);
458   bool startFailed = false;
459   {
460     auto& Proc = wrk->Proc_;
461     std::lock_guard<std::mutex> lock(Proc.Mutex);
462     if (Proc.ROP && !Proc.ROP->IsStarted()) {
463       startFailed =
464         !Proc.ROP->start(handle->loop, [wrk] { wrk->UVProcessFinished(); });
465     }
466   }
467   // Clean up if starting of the process failed
468   if (startFailed) {
469     wrk->UVProcessFinished();
470   }
471 }
472
473 void cmWorkerPoolWorker::UVProcessFinished()
474 {
475   std::lock_guard<std::mutex> lock(this->Proc_.Mutex);
476   if (this->Proc_.ROP &&
477       (this->Proc_.ROP->IsFinished() || !this->Proc_.ROP->IsStarted())) {
478     this->Proc_.ROP.reset();
479   }
480   // Notify idling thread
481   this->Proc_.Condition.notify_one();
482 }
483
484 /**
485  * @brief Private worker pool internals
486  */
487 class cmWorkerPoolInternal
488 {
489 public:
490   // -- Constructors
491   cmWorkerPoolInternal(cmWorkerPool* pool);
492   ~cmWorkerPoolInternal();
493
494   /**
495    * Runs the libuv loop.
496    */
497   bool Process();
498
499   /**
500    * Clear queue and abort threads.
501    */
502   void Abort();
503
504   /**
505    * Push a job to the queue and notify a worker.
506    */
507   bool PushJob(cmWorkerPool::JobHandleT&& jobHandle);
508
509   /**
510    * Worker thread main loop method.
511    */
512   void Work(unsigned int workerIndex);
513
514   // -- Request slots
515   static void UVSlotBegin(uv_async_t* handle);
516   static void UVSlotEnd(uv_async_t* handle);
517
518   // -- UV loop
519 #ifdef CMAKE_UV_SIGNAL_HACK
520   std::unique_ptr<cmUVSignalHackRAII> UVHackRAII;
521 #endif
522   std::unique_ptr<uv_loop_t> UVLoop;
523   cm::uv_async_ptr UVRequestBegin;
524   cm::uv_async_ptr UVRequestEnd;
525
526   // -- Thread pool and job queue
527   std::mutex Mutex;
528   bool Processing = false;
529   bool Aborting = false;
530   bool FenceProcessing = false;
531   unsigned int WorkersRunning = 0;
532   unsigned int WorkersIdle = 0;
533   unsigned int JobsProcessing = 0;
534   std::deque<cmWorkerPool::JobHandleT> Queue;
535   std::condition_variable Condition;
536   std::condition_variable ConditionFence;
537   std::vector<std::unique_ptr<cmWorkerPoolWorker>> Workers;
538
539   // -- References
540   cmWorkerPool* Pool = nullptr;
541 };
542
543 void cmWorkerPool::ProcessResultT::reset()
544 {
545   this->ExitStatus = 0;
546   this->TermSignal = 0;
547   if (!this->StdOut.empty()) {
548     this->StdOut.clear();
549     this->StdOut.shrink_to_fit();
550   }
551   if (!this->StdErr.empty()) {
552     this->StdErr.clear();
553     this->StdErr.shrink_to_fit();
554   }
555   if (!this->ErrorMessage.empty()) {
556     this->ErrorMessage.clear();
557     this->ErrorMessage.shrink_to_fit();
558   }
559 }
560
561 cmWorkerPoolInternal::cmWorkerPoolInternal(cmWorkerPool* pool)
562   : Pool(pool)
563 {
564   // Initialize libuv loop
565   uv_disable_stdio_inheritance();
566 #ifdef CMAKE_UV_SIGNAL_HACK
567   UVHackRAII = cm::make_unique<cmUVSignalHackRAII>();
568 #endif
569   this->UVLoop = cm::make_unique<uv_loop_t>();
570   uv_loop_init(this->UVLoop.get());
571 }
572
573 cmWorkerPoolInternal::~cmWorkerPoolInternal()
574 {
575   uv_loop_close(this->UVLoop.get());
576 }
577
578 bool cmWorkerPoolInternal::Process()
579 {
580   // Reset state flags
581   this->Processing = true;
582   this->Aborting = false;
583   // Initialize libuv asynchronous request
584   this->UVRequestBegin.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotBegin,
585                             this);
586   this->UVRequestEnd.init(*this->UVLoop, &cmWorkerPoolInternal::UVSlotEnd,
587                           this);
588   // Send begin request
589   this->UVRequestBegin.send();
590   // Run libuv loop
591   bool success = (uv_run(this->UVLoop.get(), UV_RUN_DEFAULT) == 0);
592   // Update state flags
593   this->Processing = false;
594   this->Aborting = false;
595   return success;
596 }
597
598 void cmWorkerPoolInternal::Abort()
599 {
600   // Clear all jobs and set abort flag
601   std::lock_guard<std::mutex> guard(this->Mutex);
602   if (!this->Aborting) {
603     // Register abort and clear queue
604     this->Aborting = true;
605     this->Queue.clear();
606     this->Condition.notify_all();
607   }
608 }
609
610 inline bool cmWorkerPoolInternal::PushJob(cmWorkerPool::JobHandleT&& jobHandle)
611 {
612   std::lock_guard<std::mutex> guard(this->Mutex);
613   if (this->Aborting) {
614     return false;
615   }
616   // Append the job to the queue
617   this->Queue.emplace_back(std::move(jobHandle));
618   // Notify an idle worker if there's one
619   if (this->WorkersIdle != 0) {
620     this->Condition.notify_one();
621   }
622   // Return success
623   return true;
624 }
625
626 void cmWorkerPoolInternal::UVSlotBegin(uv_async_t* handle)
627 {
628   auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
629   // Create worker threads
630   {
631     unsigned int const num = gint.Pool->ThreadCount();
632     // Create workers
633     gint.Workers.reserve(num);
634     for (unsigned int ii = 0; ii != num; ++ii) {
635       gint.Workers.emplace_back(
636         cm::make_unique<cmWorkerPoolWorker>(*gint.UVLoop));
637     }
638     // Start worker threads
639     for (unsigned int ii = 0; ii != num; ++ii) {
640       gint.Workers[ii]->SetThread(
641         std::thread(&cmWorkerPoolInternal::Work, &gint, ii));
642     }
643   }
644   // Destroy begin request
645   gint.UVRequestBegin.reset();
646 }
647
648 void cmWorkerPoolInternal::UVSlotEnd(uv_async_t* handle)
649 {
650   auto& gint = *reinterpret_cast<cmWorkerPoolInternal*>(handle->data);
651   // Join and destroy worker threads
652   gint.Workers.clear();
653   // Destroy end request
654   gint.UVRequestEnd.reset();
655 }
656
657 void cmWorkerPoolInternal::Work(unsigned int workerIndex)
658 {
659   cmWorkerPool::JobHandleT jobHandle;
660   std::unique_lock<std::mutex> uLock(this->Mutex);
661   // Increment running workers count
662   ++this->WorkersRunning;
663   // Enter worker main loop
664   while (true) {
665     // Abort on request
666     if (this->Aborting) {
667       break;
668     }
669     // Wait for new jobs on the main CV
670     if (this->Queue.empty()) {
671       ++this->WorkersIdle;
672       this->Condition.wait(uLock);
673       --this->WorkersIdle;
674       continue;
675     }
676
677     // If there is a fence currently active or waiting,
678     // sleep on the main CV and try again.
679     if (this->FenceProcessing) {
680       this->Condition.wait(uLock);
681       continue;
682     }
683
684     // Pop next job from queue
685     jobHandle = std::move(this->Queue.front());
686     this->Queue.pop_front();
687
688     // Check for fence jobs
689     bool raisedFence = false;
690     if (jobHandle->IsFence()) {
691       this->FenceProcessing = true;
692       raisedFence = true;
693       // Wait on the Fence CV until all pending jobs are done.
694       while (this->JobsProcessing != 0 && !this->Aborting) {
695         this->ConditionFence.wait(uLock);
696       }
697       // When aborting, explicitly kick all threads alive once more.
698       if (this->Aborting) {
699         this->FenceProcessing = false;
700         this->Condition.notify_all();
701         break;
702       }
703     }
704
705     // Unlocked scope for job processing
706     ++this->JobsProcessing;
707     {
708       uLock.unlock();
709       jobHandle->Work(this->Pool, workerIndex); // Process job
710       jobHandle.reset();                        // Destroy job
711       uLock.lock();
712     }
713     --this->JobsProcessing;
714
715     // If this was the thread that entered fence processing
716     // originally, notify all idling workers that the fence
717     // is done.
718     if (raisedFence) {
719       this->FenceProcessing = false;
720       this->Condition.notify_all();
721     }
722     // If fence processing is still not done, notify the
723     // the fencing worker when all active jobs are done.
724     if (this->FenceProcessing && this->JobsProcessing == 0) {
725       this->ConditionFence.notify_all();
726     }
727   }
728
729   // Decrement running workers count
730   if (--this->WorkersRunning == 0) {
731     // Last worker thread about to finish. Send libuv event.
732     this->UVRequestEnd.send();
733   }
734 }
735
736 cmWorkerPool::JobT::~JobT() = default;
737
738 bool cmWorkerPool::JobT::RunProcess(ProcessResultT& result,
739                                     std::vector<std::string> const& command,
740                                     std::string const& workingDirectory)
741 {
742   // Get worker by index
743   auto* wrk = this->Pool_->Int_->Workers.at(this->WorkerIndex_).get();
744   return wrk->RunProcess(result, command, workingDirectory);
745 }
746
747 cmWorkerPool::cmWorkerPool()
748   : Int_(cm::make_unique<cmWorkerPoolInternal>(this))
749 {
750 }
751
752 cmWorkerPool::~cmWorkerPool() = default;
753
754 void cmWorkerPool::SetThreadCount(unsigned int threadCount)
755 {
756   if (!this->Int_->Processing) {
757     this->ThreadCount_ = (threadCount > 0) ? threadCount : 1u;
758   }
759 }
760
761 bool cmWorkerPool::Process(void* userData)
762 {
763   // Setup user data
764   this->UserData_ = userData;
765   // Run libuv loop
766   bool success = this->Int_->Process();
767   // Clear user data
768   this->UserData_ = nullptr;
769   // Return
770   return success;
771 }
772
773 bool cmWorkerPool::PushJob(JobHandleT&& jobHandle)
774 {
775   return this->Int_->PushJob(std::move(jobHandle));
776 }
777
778 void cmWorkerPool::Abort()
779 {
780   this->Int_->Abort();
781 }