[OpenMP] Add non-blocking support for target nowait regions
authorGuilherme Valarini <guilherme.a.valarini@gmail.com>
Wed, 14 Dec 2022 16:46:23 +0000 (13:46 -0300)
committerGuilherme Valarini <guilherme.a.valarini@gmail.com>
Wed, 14 Dec 2022 17:03:32 +0000 (14:03 -0300)
This patch better integrates the target nowait functions with the tasking runtime. It splits the nowait execution into two stages: a dispatch stage, which triggers all the necessary asynchronous device operations and stores a set of post-processing procedures that must be executed after said ops; and a synchronization stage, responsible for synchronizing the previous operations in a non-blocking manner and running the appropriate post-processing functions. Suppose during the synchronization stage the operations are not completed. In that case, the attached hidden helper task is re-enqueued to any hidden helper thread to be later synchronized, allowing other target nowait regions to be concurrently dispatched.

Reviewed By: jdoerfert, tianshilei1992

Differential Revision: https://reviews.llvm.org/D132005

19 files changed:
openmp/libomptarget/include/device.h
openmp/libomptarget/include/omptarget.h
openmp/libomptarget/include/omptargetplugin.h
openmp/libomptarget/include/rtl.h
openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.cpp
openmp/libomptarget/plugins-nextgen/common/PluginInterface/PluginInterface.h
openmp/libomptarget/plugins-nextgen/cuda/src/rtl.cpp
openmp/libomptarget/plugins-nextgen/generic-elf-64bit/src/rtl.cpp
openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.cpp
openmp/libomptarget/plugins/cuda/dynamic_cuda/cuda.h
openmp/libomptarget/plugins/cuda/src/rtl.cpp
openmp/libomptarget/src/device.cpp
openmp/libomptarget/src/exports
openmp/libomptarget/src/interface.cpp
openmp/libomptarget/src/omptarget.cpp
openmp/libomptarget/src/private.h
openmp/libomptarget/src/rtl.cpp
openmp/runtime/src/kmp.h
openmp/runtime/src/kmp_tasking.cpp

index 8a38634..c7513e5 100644 (file)
@@ -438,6 +438,12 @@ struct DeviceTy {
   /// OFFLOAD_SUCCESS/OFFLOAD_FAIL when succeeds/fails.
   int32_t synchronize(AsyncInfoTy &AsyncInfo);
 
+  /// Query for device/queue/event based completion on \p AsyncInfo in a
+  /// non-blocking manner and return OFFLOAD_SUCCESS/OFFLOAD_FAIL when
+  /// succeeds/fails. Must be called multiple times until AsyncInfo is
+  /// completed and AsyncInfo.isDone() returns true.
+  int32_t queryAsync(AsyncInfoTy &AsyncInfo);
+
   /// Calls the corresponding print in the \p RTLDEVID
   /// device RTL to obtain the information of the specific device.
   bool printDeviceInfo(int32_t RTLDevID);
index c805ce7..93f6800 100644 (file)
 #define _OMPTARGET_H_
 
 #include <deque>
+#include <functional>
 #include <stddef.h>
 #include <stdint.h>
+#include <type_traits>
 
 #include <SourceInfo.h>
 
+#include "llvm/ADT/SmallVector.h"
+
 #define OFFLOAD_SUCCESS (0)
 #define OFFLOAD_FAIL (~0)
 
@@ -181,15 +185,29 @@ struct DeviceTy;
 /// associated with a libomptarget layer device. RAII semantics to avoid
 /// mistakes.
 class AsyncInfoTy {
+public:
+  enum class SyncTy { BLOCKING, NON_BLOCKING };
+
+private:
   /// Locations we used in (potentially) asynchronous calls which should live
   /// as long as this AsyncInfoTy object.
   std::deque<void *> BufferLocations;
 
+  /// Post-processing operations executed after a successful synchronization.
+  /// \note the post-processing function should return OFFLOAD_SUCCESS or
+  /// OFFLOAD_FAIL appropriately.
+  using PostProcFuncTy = std::function<int()>;
+  llvm::SmallVector<PostProcFuncTy> PostProcessingFunctions;
+
   __tgt_async_info AsyncInfo;
   DeviceTy &Device;
 
 public:
-  AsyncInfoTy(DeviceTy &Device) : Device(Device) {}
+  /// Synchronization method to be used.
+  SyncTy SyncType;
+
+  AsyncInfoTy(DeviceTy &Device, SyncTy SyncType = SyncTy::BLOCKING)
+      : Device(Device), SyncType(SyncType) {}
   ~AsyncInfoTy() { synchronize(); }
 
   /// Implicit conversion to the __tgt_async_info which is used in the
@@ -198,12 +216,54 @@ public:
 
   /// Synchronize all pending actions.
   ///
+  /// \note synchronization will be performance in a blocking or non-blocking
+  /// manner, depending on the SyncType.
+  ///
+  /// \note if the operations are completed, the registered post-processing
+  /// functions will be executed once and unregistered afterwards.
+  ///
   /// \returns OFFLOAD_FAIL or OFFLOAD_SUCCESS appropriately.
   int synchronize();
 
   /// Return a void* reference with a lifetime that is at least as long as this
   /// AsyncInfoTy object. The location can be used as intermediate buffer.
   void *&getVoidPtrLocation();
+
+  /// Check if all asynchronous operations are completed.
+  ///
+  /// \note if the operations are completed, the registered post-processing
+  /// functions will be executed once and unregistered afterwards.
+  ///
+  /// \returns true if there is no pending asynchronous operations, false
+  /// otherwise.
+  bool isDone();
+
+  /// Add a new post-processing function to be executed after synchronization.
+  ///
+  /// \param[in] Function is a templated function (e.g., function pointers,
+  /// lambdas, std::function) that can be convertible to a PostProcFuncTy (i.e.,
+  /// it must have int() as its function signature).
+  template <typename FuncTy> void addPostProcessingFunction(FuncTy &&Function) {
+    static_assert(std::is_convertible_v<FuncTy, PostProcFuncTy>,
+                  "Invalid post-processing function type. Please check "
+                  "function signature!");
+    PostProcessingFunctions.emplace_back(Function);
+  }
+
+private:
+  /// Run all the post-processing functions sequentially.
+  ///
+  /// \note after a successful execution, all previously registered functions
+  /// are unregistered.
+  ///
+  /// \returns OFFLOAD_FAIL if any post-processing function failed,
+  /// OFFLOAD_SUCCESS otherwise.
+  int32_t runPostProcessing();
+
+  /// Check if the internal asynchronous info queue is empty or not.
+  ///
+  /// \returns true if empty, false otherwise.
+  bool isQueueEmpty() const;
 };
 
 /// This struct is a record of non-contiguous information
@@ -347,6 +407,15 @@ int __tgt_target_kernel_nowait(ident_t *Loc, int64_t DeviceId, int32_t NumTeams,
                                void *DepList, int32_t NoAliasDepNum,
                                void *NoAliasDepList);
 
+// Non-blocking synchronization for target nowait regions. This function
+// acquires the asynchronous context from task data of the current task being
+// executed and tries to query for the completion of its operations. If the
+// operations are still pending, the function returns immediately. If the
+// operations are completed, all the post-processing procedures stored in the
+// asynchronous context are executed and the context is removed from the task
+// data.
+void __tgt_target_nowait_query(void **AsyncHandle);
+
 void __tgt_set_info_flag(uint32_t);
 
 int __tgt_print_device_info(int64_t DeviceId);
index 476f459..50fa6e9 100644 (file)
@@ -156,6 +156,16 @@ int32_t __tgt_rtl_run_target_team_region_async(
 // error code.
 int32_t __tgt_rtl_synchronize(int32_t ID, __tgt_async_info *AsyncInfo);
 
+// Queries for the completion of asynchronous operations. Instead of blocking
+// the calling thread as __tgt_rtl_synchronize, the progress of the operations
+// stored in AsyncInfo->Queue is queried in a non-blocking manner, partially
+// advancing their execution. If all operations are completed, AsyncInfo->Queue
+// is set to nullptr. If there are still pending operations, AsyncInfo->Queue is
+// kept as a valid queue. In any case of success (i.e., successful query
+// with/without completing all operations), return zero. Otherwise, return an
+// error code.
+int32_t __tgt_rtl_query_async(int32_t ID, __tgt_async_info *AsyncInfo);
+
 // Set plugin's internal information flag externally.
 void __tgt_rtl_set_info_flag(uint32_t);
 
index b0d0e18..a736731 100644 (file)
@@ -62,6 +62,7 @@ struct RTLInfoTy {
                                             __tgt_async_info *);
   typedef int64_t(init_requires_ty)(int64_t);
   typedef int32_t(synchronize_ty)(int32_t, __tgt_async_info *);
+  typedef int32_t(query_async_ty)(int32_t, __tgt_async_info *);
   typedef int32_t (*register_lib_ty)(__tgt_bin_desc *);
   typedef int32_t(supports_empty_images_ty)();
   typedef void(print_device_info_ty)(int32_t);
@@ -112,6 +113,7 @@ struct RTLInfoTy {
   run_team_region_async_ty *run_team_region_async = nullptr;
   init_requires_ty *init_requires = nullptr;
   synchronize_ty *synchronize = nullptr;
+  query_async_ty *query_async = nullptr;
   register_lib_ty register_lib = nullptr;
   register_lib_ty unregister_lib = nullptr;
   supports_empty_images_ty *supports_empty_images = nullptr;
index d164ddd..eb03e3b 100644 (file)
@@ -354,6 +354,13 @@ Error GenericDeviceTy::synchronize(__tgt_async_info *AsyncInfo) {
   return synchronizeImpl(*AsyncInfo);
 }
 
+Error GenericDeviceTy::queryAsync(__tgt_async_info *AsyncInfo) {
+  if (!AsyncInfo || !AsyncInfo->Queue)
+    return Plugin::error("Invalid async info queue");
+
+  return queryAsyncImpl(*AsyncInfo);
+}
+
 Expected<void *> GenericDeviceTy::dataAlloc(int64_t Size, void *HostPtr,
                                             TargetAllocTy Kind) {
   void *Alloc = nullptr;
@@ -791,6 +798,16 @@ int32_t __tgt_rtl_synchronize(int32_t DeviceId,
   return (bool)Err;
 }
 
+int32_t __tgt_rtl_query_async(int32_t DeviceId,
+                              __tgt_async_info *AsyncInfoPtr) {
+  auto Err = Plugin::get().getDevice(DeviceId).queryAsync(AsyncInfoPtr);
+  if (Err)
+    REPORT("Failure to query stream %p: %s\n", AsyncInfoPtr->Queue,
+           toString(std::move(Err)).data());
+
+  return (bool)Err;
+}
+
 int32_t __tgt_rtl_run_target_region(int32_t DeviceId, void *TgtEntryPtr,
                                     void **TgtArgs, ptrdiff_t *TgtOffsets,
                                     int32_t NumArgs) {
index 4882755..eeb6f69 100644 (file)
@@ -290,6 +290,11 @@ struct GenericDeviceTy : public DeviceAllocatorTy {
   Error synchronize(__tgt_async_info *AsyncInfo);
   virtual Error synchronizeImpl(__tgt_async_info &AsyncInfo) = 0;
 
+  /// Query for the completion of the pending operations on the __tgt_async_info
+  /// structure in a non-blocking manner.
+  Error queryAsync(__tgt_async_info *AsyncInfo);
+  virtual Error queryAsyncImpl(__tgt_async_info &AsyncInfo) = 0;
+
   /// Allocate data on the device or involving the device.
   Expected<void *> dataAlloc(int64_t Size, void *HostPtr, TargetAllocTy Kind);
 
index 1e16994..8ab6089 100644 (file)
@@ -486,6 +486,24 @@ struct CUDADeviceTy : public GenericDeviceTy {
     return Plugin::check(Res, "Error in cuStreamSynchronize: %s");
   }
 
+  /// Query for the completion of the pending operations on the async info.
+  Error queryAsyncImpl(__tgt_async_info &AsyncInfo) override {
+    CUstream Stream = reinterpret_cast<CUstream>(AsyncInfo.Queue);
+    CUresult Res = cuStreamQuery(Stream);
+
+    // Not ready streams must be considered as successful operations.
+    if (Res == CUDA_ERROR_NOT_READY)
+      return Plugin::success();
+
+    // Once the stream is synchronized and the operations completed (or an error
+    // occurs), return it to stream pool and reset AsyncInfo. This is to make
+    // sure the synchronization only works for its own tasks.
+    CUDAStreamManager.returnResource(Stream);
+    AsyncInfo.Queue = nullptr;
+
+    return Plugin::check(Res, "Error in cuStreamQuery: %s");
+  }
+
   /// Submit data to the device (host to device transfer).
   Error dataSubmitImpl(void *TgtPtr, const void *HstPtr, int64_t Size,
                        AsyncInfoWrapperTy &AsyncInfoWrapper) override {
index ff09b96..56ed371 100644 (file)
@@ -245,6 +245,12 @@ struct GenELF64DeviceTy : public GenericDeviceTy {
     return Plugin::success();
   }
 
+  /// All functions are already synchronous. No need to do anything on this
+  /// query function.
+  Error queryAsyncImpl(__tgt_async_info &AsyncInfo) override {
+    return Plugin::success();
+  }
+
   /// This plugin does not support interoperability
   Error initAsyncInfoImpl(AsyncInfoWrapperTy &AsyncInfoWrapper) override {
     return Plugin::error("initAsyncInfoImpl not supported");
index a7b4a8d..8f9ccec 100644 (file)
@@ -58,6 +58,7 @@ DLWRAP(cuModuleUnload, 1)
 DLWRAP(cuStreamCreate, 2)
 DLWRAP(cuStreamDestroy, 1)
 DLWRAP(cuStreamSynchronize, 1)
+DLWRAP(cuStreamQuery, 1)
 DLWRAP(cuCtxSetCurrent, 1)
 DLWRAP(cuDevicePrimaryCtxRelease, 1)
 DLWRAP(cuDevicePrimaryCtxGetState, 3)
index 1d60948..459236f 100644 (file)
@@ -31,6 +31,7 @@ typedef enum cudaError_enum {
   CUDA_ERROR_INVALID_VALUE = 1,
   CUDA_ERROR_NO_DEVICE = 100,
   CUDA_ERROR_INVALID_HANDLE = 400,
+  CUDA_ERROR_NOT_READY = 600,
   CUDA_ERROR_TOO_MANY_PEERS = 711,
 } CUresult;
 
@@ -244,6 +245,7 @@ CUresult cuModuleUnload(CUmodule);
 CUresult cuStreamCreate(CUstream *, unsigned);
 CUresult cuStreamDestroy(CUstream);
 CUresult cuStreamSynchronize(CUstream);
+CUresult cuStreamQuery(CUstream);
 CUresult cuCtxSetCurrent(CUcontext);
 CUresult cuDevicePrimaryCtxRelease(CUdevice);
 CUresult cuDevicePrimaryCtxGetState(CUdevice, unsigned *, int *);
index 5f249ce..3bb060f 100644 (file)
@@ -1267,6 +1267,29 @@ public:
     return (Err == CUDA_SUCCESS) ? OFFLOAD_SUCCESS : OFFLOAD_FAIL;
   }
 
+  int queryAsync(const int DeviceId, __tgt_async_info *AsyncInfo) const {
+    CUstream Stream = reinterpret_cast<CUstream>(AsyncInfo->Queue);
+    CUresult Err = cuStreamQuery(Stream);
+
+    // Not ready streams must be considered as successful operations.
+    if (Err == CUDA_ERROR_NOT_READY)
+      return OFFLOAD_SUCCESS;
+
+    // Once the stream is synchronized or an error occurs, return it to the
+    // stream pool and reset AsyncInfo. This is to make sure the
+    // synchronization only works for its own tasks.
+    StreamPool[DeviceId]->release(Stream);
+    AsyncInfo->Queue = nullptr;
+
+    if (Err != CUDA_SUCCESS) {
+      DP("Error when querying for stream progress. stream = " DPxMOD
+         ", async info ptr = " DPxMOD "\n",
+         DPxPTR(Stream), DPxPTR(AsyncInfo));
+      CUDA_ERR_STRING(Err);
+    }
+    return (Err == CUDA_SUCCESS) ? OFFLOAD_SUCCESS : OFFLOAD_FAIL;
+  }
+
   void printDeviceInfo(int32_t DeviceId) {
     char TmpChar[1000];
     std::string TmpStr;
@@ -1780,6 +1803,15 @@ int32_t __tgt_rtl_synchronize(int32_t DeviceId,
   return DeviceRTL.synchronize(DeviceId, AsyncInfoPtr);
 }
 
+int32_t __tgt_rtl_query_async(int32_t DeviceId,
+                              __tgt_async_info *AsyncInfoPtr) {
+  assert(DeviceRTL.isValidDeviceId(DeviceId) && "device_id is invalid");
+  assert(AsyncInfoPtr && "async_info_ptr is nullptr");
+  assert(AsyncInfoPtr->Queue && "async_info_ptr->Queue is nullptr");
+  // NOTE: We don't need to set context for stream query.
+  return DeviceRTL.queryAsync(DeviceId, AsyncInfoPtr);
+}
+
 void __tgt_rtl_set_info_flag(uint32_t NewInfoLevel) {
   std::atomic<uint32_t> &InfoLevel = getInfoLevelInternal();
   InfoLevel.store(NewInfoLevel);
index 1dffc76..6c09ec1 100644 (file)
@@ -641,6 +641,13 @@ int32_t DeviceTy::synchronize(AsyncInfoTy &AsyncInfo) {
   return OFFLOAD_SUCCESS;
 }
 
+int32_t DeviceTy::queryAsync(AsyncInfoTy &AsyncInfo) {
+  if (RTL->query_async)
+    return RTL->query_async(RTLDeviceID, AsyncInfo);
+
+  return synchronize(AsyncInfo);
+}
+
 int32_t DeviceTy::createEvent(void **Event) {
   if (RTL->create_event)
     return RTL->create_event(RTLDeviceID, Event);
index 7fc7e81..fe1c015 100644 (file)
@@ -26,6 +26,7 @@ VERS1.0 {
     __tgt_target_teams_nowait_mapper;
     __tgt_target_kernel;
     __tgt_target_kernel_nowait;
+    __tgt_target_nowait_query;
     __tgt_mapper_num_components;
     __tgt_push_mapper_component;
     __kmpc_push_target_tripcount;
@@ -60,4 +61,3 @@ VERS1.0 {
   local:
     *;
 };
-
index c58be91..bee3d5b 100644 (file)
 #include "private.h"
 #include "rtl.h"
 
+#include "Utilities.h"
+
 #include <cassert>
 #include <cstdio>
 #include <cstdlib>
 #include <mutex>
+#include <type_traits>
 
 ////////////////////////////////////////////////////////////////////////////////
 /// adds requires flags
@@ -61,28 +64,29 @@ EXTERN void __tgt_unregister_lib(__tgt_bin_desc *Desc) {
   }
 }
 
-/// creates host-to-target data mapping, stores it in the
-/// libomptarget.so internal structure (an entry in a stack of data maps)
-/// and passes the data to the device.
-EXTERN void __tgt_target_data_begin_mapper(ident_t *Loc, int64_t DeviceId,
-                                           int32_t ArgNum, void **ArgsBase,
-                                           void **Args, int64_t *ArgSizes,
-                                           int64_t *ArgTypes,
-                                           map_var_info_t *ArgNames,
-                                           void **ArgMappers) {
+template <typename TargetAsyncInfoTy>
+static inline void
+targetDataMapper(ident_t *Loc, int64_t DeviceId, int32_t ArgNum,
+                 void **ArgsBase, void **Args, int64_t *ArgSizes,
+                 int64_t *ArgTypes, map_var_info_t *ArgNames, void **ArgMappers,
+                 TargetDataFuncPtrTy TargetDataFunction,
+                 const char *RegionTypeMsg, const char *RegionName) {
+  static_assert(std::is_convertible_v<TargetAsyncInfoTy, AsyncInfoTy>,
+                "TargetAsyncInfoTy must be convertible to AsyncInfoTy.");
+
   TIMESCOPE_WITH_IDENT(Loc);
-  DP("Entering data begin region for device %" PRId64 " with %d mappings\n",
-     DeviceId, ArgNum);
+
+  DP("Entering data %s region for device %" PRId64 " with %d mappings\n",
+     RegionName, DeviceId, ArgNum);
+
   if (checkDeviceAndCtors(DeviceId, Loc)) {
     DP("Not offloading to device %" PRId64 "\n", DeviceId);
     return;
   }
 
-  DeviceTy &Device = *PM->Devices[DeviceId];
-
   if (getInfoLevel() & OMP_INFOTYPE_KERNEL_ARGS)
     printKernelArguments(Loc, DeviceId, ArgNum, ArgSizes, ArgTypes, ArgNames,
-                         "Entering OpenMP data region");
+                         RegionTypeMsg);
 #ifdef OMPTARGET_DEBUG
   for (int I = 0; I < ArgNum; ++I) {
     DP("Entry %2d: Base=" DPxMOD ", Begin=" DPxMOD ", Size=%" PRId64
@@ -92,23 +96,45 @@ EXTERN void __tgt_target_data_begin_mapper(ident_t *Loc, int64_t DeviceId,
   }
 #endif
 
-  AsyncInfoTy AsyncInfo(Device);
-  int Rc = targetDataBegin(Loc, Device, ArgNum, ArgsBase, Args, ArgSizes,
-                           ArgTypes, ArgNames, ArgMappers, AsyncInfo);
+  DeviceTy &Device = *PM->Devices[DeviceId];
+  TargetAsyncInfoTy TargetAsyncInfo(Device);
+  AsyncInfoTy &AsyncInfo = TargetAsyncInfo;
+
+  int Rc = OFFLOAD_SUCCESS;
+  Rc = TargetDataFunction(Loc, Device, ArgNum, ArgsBase, Args, ArgSizes,
+                          ArgTypes, ArgNames, ArgMappers, AsyncInfo,
+                          false /* FromMapper */);
+
   if (Rc == OFFLOAD_SUCCESS)
     Rc = AsyncInfo.synchronize();
+
   handleTargetOutcome(Rc == OFFLOAD_SUCCESS, Loc);
 }
 
+/// creates host-to-target data mapping, stores it in the
+/// libomptarget.so internal structure (an entry in a stack of data maps)
+/// and passes the data to the device.
+EXTERN void __tgt_target_data_begin_mapper(ident_t *Loc, int64_t DeviceId,
+                                           int32_t ArgNum, void **ArgsBase,
+                                           void **Args, int64_t *ArgSizes,
+                                           int64_t *ArgTypes,
+                                           map_var_info_t *ArgNames,
+                                           void **ArgMappers) {
+  TIMESCOPE_WITH_IDENT(Loc);
+  targetDataMapper<AsyncInfoTy>(Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes,
+                                ArgTypes, ArgNames, ArgMappers, targetDataBegin,
+                                "Entering OpenMP data region", "begin");
+}
+
 EXTERN void __tgt_target_data_begin_nowait_mapper(
     ident_t *Loc, int64_t DeviceId, int32_t ArgNum, void **ArgsBase,
     void **Args, int64_t *ArgSizes, int64_t *ArgTypes, map_var_info_t *ArgNames,
     void **ArgMappers, int32_t DepNum, void *DepList, int32_t NoAliasDepNum,
     void *NoAliasDepList) {
   TIMESCOPE_WITH_IDENT(Loc);
-
-  __tgt_target_data_begin_mapper(Loc, DeviceId, ArgNum, ArgsBase, Args,
-                                 ArgSizes, ArgTypes, ArgNames, ArgMappers);
+  targetDataMapper<TaskAsyncInfoWrapperTy>(
+      Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, ArgTypes, ArgNames,
+      ArgMappers, targetDataBegin, "Entering OpenMP data region", "begin");
 }
 
 /// passes data from the target, releases target memory and destroys
@@ -121,32 +147,9 @@ EXTERN void __tgt_target_data_end_mapper(ident_t *Loc, int64_t DeviceId,
                                          map_var_info_t *ArgNames,
                                          void **ArgMappers) {
   TIMESCOPE_WITH_IDENT(Loc);
-  DP("Entering data end region with %d mappings\n", ArgNum);
-  if (checkDeviceAndCtors(DeviceId, Loc)) {
-    DP("Not offloading to device %" PRId64 "\n", DeviceId);
-    return;
-  }
-
-  DeviceTy &Device = *PM->Devices[DeviceId];
-
-  if (getInfoLevel() & OMP_INFOTYPE_KERNEL_ARGS)
-    printKernelArguments(Loc, DeviceId, ArgNum, ArgSizes, ArgTypes, ArgNames,
-                         "Exiting OpenMP data region");
-#ifdef OMPTARGET_DEBUG
-  for (int I = 0; I < ArgNum; ++I) {
-    DP("Entry %2d: Base=" DPxMOD ", Begin=" DPxMOD ", Size=%" PRId64
-       ", Type=0x%" PRIx64 ", Name=%s\n",
-       I, DPxPTR(ArgsBase[I]), DPxPTR(Args[I]), ArgSizes[I], ArgTypes[I],
-       (ArgNames) ? getNameFromMapping(ArgNames[I]).c_str() : "unknown");
-  }
-#endif
-
-  AsyncInfoTy AsyncInfo(Device);
-  int Rc = targetDataEnd(Loc, Device, ArgNum, ArgsBase, Args, ArgSizes,
-                         ArgTypes, ArgNames, ArgMappers, AsyncInfo);
-  if (Rc == OFFLOAD_SUCCESS)
-    Rc = AsyncInfo.synchronize();
-  handleTargetOutcome(Rc == OFFLOAD_SUCCESS, Loc);
+  targetDataMapper<AsyncInfoTy>(Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes,
+                                ArgTypes, ArgNames, ArgMappers, targetDataEnd,
+                                "Exiting OpenMP data region", "end");
 }
 
 EXTERN void __tgt_target_data_end_nowait_mapper(
@@ -155,9 +158,9 @@ EXTERN void __tgt_target_data_end_nowait_mapper(
     void **ArgMappers, int32_t DepNum, void *DepList, int32_t NoAliasDepNum,
     void *NoAliasDepList) {
   TIMESCOPE_WITH_IDENT(Loc);
-
-  __tgt_target_data_end_mapper(Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes,
-                               ArgTypes, ArgNames, ArgMappers);
+  targetDataMapper<TaskAsyncInfoWrapperTy>(
+      Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, ArgTypes, ArgNames,
+      ArgMappers, targetDataEnd, "Exiting OpenMP data region", "end");
 }
 
 EXTERN void __tgt_target_data_update_mapper(ident_t *Loc, int64_t DeviceId,
@@ -167,23 +170,9 @@ EXTERN void __tgt_target_data_update_mapper(ident_t *Loc, int64_t DeviceId,
                                             map_var_info_t *ArgNames,
                                             void **ArgMappers) {
   TIMESCOPE_WITH_IDENT(Loc);
-  DP("Entering data update with %d mappings\n", ArgNum);
-  if (checkDeviceAndCtors(DeviceId, Loc)) {
-    DP("Not offloading to device %" PRId64 "\n", DeviceId);
-    return;
-  }
-
-  if (getInfoLevel() & OMP_INFOTYPE_KERNEL_ARGS)
-    printKernelArguments(Loc, DeviceId, ArgNum, ArgSizes, ArgTypes, ArgNames,
-                         "Updating OpenMP data");
-
-  DeviceTy &Device = *PM->Devices[DeviceId];
-  AsyncInfoTy AsyncInfo(Device);
-  int Rc = targetDataUpdate(Loc, Device, ArgNum, ArgsBase, Args, ArgSizes,
-                            ArgTypes, ArgNames, ArgMappers, AsyncInfo);
-  if (Rc == OFFLOAD_SUCCESS)
-    Rc = AsyncInfo.synchronize();
-  handleTargetOutcome(Rc == OFFLOAD_SUCCESS, Loc);
+  targetDataMapper<AsyncInfoTy>(
+      Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, ArgTypes, ArgNames,
+      ArgMappers, targetDataUpdate, "Updating OpenMP data", "update");
 }
 
 EXTERN void __tgt_target_data_update_nowait_mapper(
@@ -192,37 +181,33 @@ EXTERN void __tgt_target_data_update_nowait_mapper(
     void **ArgMappers, int32_t DepNum, void *DepList, int32_t NoAliasDepNum,
     void *NoAliasDepList) {
   TIMESCOPE_WITH_IDENT(Loc);
-
-  __tgt_target_data_update_mapper(Loc, DeviceId, ArgNum, ArgsBase, Args,
-                                  ArgSizes, ArgTypes, ArgNames, ArgMappers);
+  targetDataMapper<TaskAsyncInfoWrapperTy>(
+      Loc, DeviceId, ArgNum, ArgsBase, Args, ArgSizes, ArgTypes, ArgNames,
+      ArgMappers, targetDataUpdate, "Updating OpenMP data", "update");
 }
 
-/// Implements a kernel entry that executes the target region on the specified
-/// device.
-///
-/// \param Loc Source location associated with this target region.
-/// \param DeviceId The device to execute this region, -1 indicated the default.
-/// \param NumTeams Number of teams to launch the region with, -1 indicates a
-///                 non-teams region and 0 indicates it was unspecified.
-/// \param ThreadLimit Limit to the number of threads to use in the kernel
-///                    launch, 0 indicates it was unspecified.
-/// \param HostPtr  The pointer to the host function registered with the kernel.
-/// \param Args     All arguments to this kernel launch (see struct definition).
-EXTERN int __tgt_target_kernel(ident_t *Loc, int64_t DeviceId, int32_t NumTeams,
+template <typename TargetAsyncInfoTy>
+static inline int targetKernel(ident_t *Loc, int64_t DeviceId, int32_t NumTeams,
                                int32_t ThreadLimit, void *HostPtr,
                                __tgt_kernel_arguments *Args) {
+  static_assert(std::is_convertible_v<TargetAsyncInfoTy, AsyncInfoTy>,
+                "Target AsyncInfoTy must be convertible to AsyncInfoTy.");
+
   TIMESCOPE_WITH_IDENT(Loc);
-  DP("Entering target region with entry point " DPxMOD " and device Id %" PRId64
+
+  DP("Entering target region for device %" PRId64 " with entry point " DPxMOD
      "\n",
-     DPxPTR(HostPtr), DeviceId);
-  if (Args->Version != 1) {
-    DP("Unexpected ABI version: %d\n", Args->Version);
-  }
+     DeviceId, DPxPTR(HostPtr));
+
   if (checkDeviceAndCtors(DeviceId, Loc)) {
     DP("Not offloading to device %" PRId64 "\n", DeviceId);
     return OMP_TGT_FAIL;
   }
 
+  if (Args->Version != 1) {
+    DP("Unexpected ABI version: %d\n", Args->Version);
+  }
+
   if (getInfoLevel() & OMP_INFOTYPE_KERNEL_ARGS)
     printKernelArguments(Loc, DeviceId, Args->NumArgs, Args->ArgSizes,
                          Args->ArgTypes, Args->ArgNames,
@@ -243,26 +228,50 @@ EXTERN int __tgt_target_kernel(ident_t *Loc, int64_t DeviceId, int32_t NumTeams,
     NumTeams = 0;
 
   DeviceTy &Device = *PM->Devices[DeviceId];
-  AsyncInfoTy AsyncInfo(Device);
-  int Rc = target(Loc, Device, HostPtr, Args->NumArgs, Args->ArgBasePtrs,
-                  Args->ArgPtrs, Args->ArgSizes, Args->ArgTypes, Args->ArgNames,
-                  Args->ArgMappers, NumTeams, ThreadLimit, Args->Tripcount,
-                  IsTeams, AsyncInfo);
+  TargetAsyncInfoTy TargetAsyncInfo(Device);
+  AsyncInfoTy &AsyncInfo = TargetAsyncInfo;
+
+  int Rc = OFFLOAD_SUCCESS;
+  Rc = target(Loc, Device, HostPtr, Args->NumArgs, Args->ArgBasePtrs,
+              Args->ArgPtrs, Args->ArgSizes, Args->ArgTypes, Args->ArgNames,
+              Args->ArgMappers, NumTeams, ThreadLimit, Args->Tripcount, IsTeams,
+              AsyncInfo);
+
   if (Rc == OFFLOAD_SUCCESS)
     Rc = AsyncInfo.synchronize();
+
   handleTargetOutcome(Rc == OFFLOAD_SUCCESS, Loc);
   assert(Rc == OFFLOAD_SUCCESS && "__tgt_target_kernel unexpected failure!");
+
   return OMP_TGT_SUCCESS;
 }
 
+/// Implements a kernel entry that executes the target region on the specified
+/// device.
+///
+/// \param Loc Source location associated with this target region.
+/// \param DeviceId The device to execute this region, -1 indicated the default.
+/// \param NumTeams Number of teams to launch the region with, -1 indicates a
+///                 non-teams region and 0 indicates it was unspecified.
+/// \param ThreadLimit Limit to the number of threads to use in the kernel
+///                    launch, 0 indicates it was unspecified.
+/// \param HostPtr  The pointer to the host function registered with the kernel.
+/// \param Args     All arguments to this kernel launch (see struct definition).
+EXTERN int __tgt_target_kernel(ident_t *Loc, int64_t DeviceId, int32_t NumTeams,
+                               int32_t ThreadLimit, void *HostPtr,
+                               __tgt_kernel_arguments *Args) {
+  TIMESCOPE_WITH_IDENT(Loc);
+  return targetKernel<AsyncInfoTy>(Loc, DeviceId, NumTeams, ThreadLimit,
+                                   HostPtr, Args);
+}
+
 EXTERN int __tgt_target_kernel_nowait(
     ident_t *Loc, int64_t DeviceId, int32_t NumTeams, int32_t ThreadLimit,
     void *HostPtr, __tgt_kernel_arguments *Args, int32_t DepNum, void *DepList,
     int32_t NoAliasDepNum, void *NoAliasDepList) {
   TIMESCOPE_WITH_IDENT(Loc);
-
-  return __tgt_target_kernel(Loc, DeviceId, NumTeams, ThreadLimit, HostPtr,
-                             Args);
+  return targetKernel<TaskAsyncInfoWrapperTy>(Loc, DeviceId, NumTeams,
+                                              ThreadLimit, HostPtr, Args);
 }
 
 // Get the current number of components for a user-defined mapper.
@@ -303,3 +312,43 @@ EXTERN int __tgt_print_device_info(int64_t DeviceId) {
   return PM->Devices[DeviceId]->printDeviceInfo(
       PM->Devices[DeviceId]->RTLDeviceID);
 }
+
+EXTERN void __tgt_target_nowait_query(void **AsyncHandle) {
+  if (!AsyncHandle || !*AsyncHandle) {
+    FATAL_MESSAGE0(
+        1, "Receive an invalid async handle from the current OpenMP task. Is "
+           "this a target nowait region?\n");
+  }
+
+  // Exponential backoff tries to optimally decide if a thread should just query
+  // for the device operations (work/spin wait on them) or block until they are
+  // completed (use device side blocking mechanism). This allows the runtime to
+  // adapt itself when there are a lot of long-running target regions in-flight.
+  using namespace llvm::omp::target;
+  static thread_local ExponentialBackoff QueryCounter(
+      Int64Envar("OMPTARGET_QUERY_COUNT_MAX", 10),
+      Int64Envar("OMPTARGET_QUERY_COUNT_THRESHOLD", 5),
+      Envar<float>("OMPTARGET_QUERY_COUNT_BACKOFF_FACTOR", 0.5f));
+
+  auto *AsyncInfo = (AsyncInfoTy *)*AsyncHandle;
+
+  // If the thread is actively waiting on too many target nowait regions, we
+  // should use the blocking sync type.
+  if (QueryCounter.isAboveThreshold())
+    AsyncInfo->SyncType = AsyncInfoTy::SyncTy::BLOCKING;
+
+  // If there are device operations still pending, return immediately without
+  // deallocating the handle and increase the current thread query count.
+  if (!AsyncInfo->isDone()) {
+    QueryCounter.increment();
+    return;
+  }
+
+  // When a thread successfully completes a target nowait region, we
+  // exponentially backoff its query counter by the query factor.
+  QueryCounter.decrement();
+
+  // Delete the handle and unset it from the OpenMP task data.
+  delete AsyncInfo;
+  *AsyncHandle = nullptr;
+}
index 5b1a119..f9c5b95 100644 (file)
@@ -24,13 +24,25 @@ using llvm::SmallVector;
 
 int AsyncInfoTy::synchronize() {
   int Result = OFFLOAD_SUCCESS;
-  if (AsyncInfo.Queue) {
-    // If we have a queue we need to synchronize it now.
-    Result = Device.synchronize(*this);
-    assert(AsyncInfo.Queue == nullptr &&
-           "The device plugin should have nulled the queue to indicate there "
-           "are no outstanding actions!");
+  if (!isQueueEmpty()) {
+    switch (SyncType) {
+    case SyncTy::BLOCKING:
+      // If we have a queue we need to synchronize it now.
+      Result = Device.synchronize(*this);
+      assert(AsyncInfo.Queue == nullptr &&
+             "The device plugin should have nulled the queue to indicate there "
+             "are no outstanding actions!");
+      break;
+    case SyncTy::NON_BLOCKING:
+      Result = Device.queryAsync(*this);
+      break;
+    }
   }
+
+  // Run any pending post-processing function registered on this async object.
+  if (Result == OFFLOAD_SUCCESS && isQueueEmpty())
+    Result = runPostProcessing();
+
   return Result;
 }
 
@@ -39,6 +51,30 @@ void *&AsyncInfoTy::getVoidPtrLocation() {
   return BufferLocations.back();
 }
 
+bool AsyncInfoTy::isDone() {
+  synchronize();
+  // The async info operations are completed when the internal queue is empty.
+  return isQueueEmpty();
+}
+
+int32_t AsyncInfoTy::runPostProcessing() {
+  size_t Size = PostProcessingFunctions.size();
+  for (size_t I = 0; I < Size; ++I) {
+    const int Result = PostProcessingFunctions[I]();
+    if (Result != OFFLOAD_SUCCESS)
+      return Result;
+  }
+
+  // Clear the vector up until the last known function, since post-processing
+  // procedures might add new procedures themselves.
+  const auto PrevBegin = PostProcessingFunctions.begin();
+  PostProcessingFunctions.erase(PrevBegin, PrevBegin + Size);
+
+  return OFFLOAD_SUCCESS;
+}
+
+bool AsyncInfoTy::isQueueEmpty() const { return AsyncInfo.Queue == nullptr; }
+
 /* All begin addresses for partially mapped structs must be 8-aligned in order
  * to ensure proper alignment of members. E.g.
  *
@@ -696,12 +732,89 @@ static void applyToShadowMapEntries(DeviceTy &Device, CBTy CB, void *Begin,
 
 } // namespace
 
+/// Applies the necessary post-processing procedures to entries listed in \p
+/// EntriesInfo after the execution of all device side operations from a target
+/// data end. This includes the update of pointers at the host and removal of
+/// device buffer when needed. It returns OFFLOAD_FAIL or OFFLOAD_SUCCESS
+/// according to the successfulness of the operations.
+static int
+postProcessingTargetDataEnd(DeviceTy *Device,
+                            SmallVector<PostProcessingInfo> EntriesInfo,
+                            void *FromMapperBase) {
+  int Ret = OFFLOAD_SUCCESS;
+
+  for (PostProcessingInfo &Info : EntriesInfo) {
+    // If we marked the entry to be deleted we need to verify no other
+    // thread reused it by now. If deletion is still supposed to happen by
+    // this thread LR will be set and exclusive access to the HDTT map
+    // will avoid another thread reusing the entry now. Note that we do
+    // not request (exclusive) access to the HDTT map if Info.DelEntry is
+    // not set.
+    LookupResult LR;
+    DeviceTy::HDTTMapAccessorTy HDTTMap =
+        Device->HostDataToTargetMap.getExclusiveAccessor(!Info.DelEntry);
+
+    if (Info.DelEntry) {
+      LR = Device->lookupMapping(HDTTMap, Info.HstPtrBegin, Info.DataSize);
+      if (LR.Entry->getTotalRefCount() != 0 ||
+          LR.Entry->getDeleteThreadId() != std::this_thread::get_id()) {
+        // The thread is not in charge of deletion anymore. Give up access
+        // to the HDTT map and unset the deletion flag.
+        HDTTMap.destroy();
+        Info.DelEntry = false;
+      }
+    }
+
+    // If we copied back to the host a struct/array containing pointers,
+    // we need to restore the original host pointer values from their
+    // shadow copies. If the struct is going to be deallocated, remove any
+    // remaining shadow pointer entries for this struct.
+    auto CB = [&](ShadowPtrListTy::iterator &Itr) {
+      // If we copied the struct to the host, we need to restore the
+      // pointer.
+      if (Info.ArgType & OMP_TGT_MAPTYPE_FROM) {
+        void **ShadowHstPtrAddr = (void **)Itr->first;
+        *ShadowHstPtrAddr = Itr->second.HstPtrVal;
+        DP("Restoring original host pointer value " DPxMOD " for host "
+           "pointer " DPxMOD "\n",
+           DPxPTR(Itr->second.HstPtrVal), DPxPTR(ShadowHstPtrAddr));
+      }
+      // If the struct is to be deallocated, remove the shadow entry.
+      if (Info.DelEntry) {
+        DP("Removing shadow pointer " DPxMOD "\n", DPxPTR((void **)Itr->first));
+        auto OldItr = Itr;
+        Itr++;
+        Device->ShadowPtrMap.erase(OldItr);
+      } else {
+        ++Itr;
+      }
+      return OFFLOAD_SUCCESS;
+    };
+    applyToShadowMapEntries(*Device, CB, Info.HstPtrBegin, Info.DataSize,
+                            Info.TPR);
+
+    // If we are deleting the entry the DataMapMtx is locked and we own
+    // the entry.
+    if (Info.DelEntry) {
+      if (!FromMapperBase || FromMapperBase != Info.HstPtrBegin)
+        Ret = Device->deallocTgtPtr(HDTTMap, LR, Info.DataSize);
+
+      if (Ret != OFFLOAD_SUCCESS) {
+        REPORT("Deallocating data from device failed.\n");
+        break;
+      }
+    }
+  }
+
+  return Ret;
+}
+
 /// Internal function to undo the mapping and retrieve the data from the device.
 int targetDataEnd(ident_t *Loc, DeviceTy &Device, int32_t ArgNum,
                   void **ArgBases, void **Args, int64_t *ArgSizes,
                   int64_t *ArgTypes, map_var_info_t *ArgNames,
                   void **ArgMappers, AsyncInfoTy &AsyncInfo, bool FromMapper) {
-  int Ret;
+  int Ret = OFFLOAD_SUCCESS;
   SmallVector<PostProcessingInfo> PostProcessingPtrs;
   void *FromMapperBase = nullptr;
   // process each input.
@@ -861,75 +974,15 @@ int targetDataEnd(ident_t *Loc, DeviceTy &Device, int32_t ArgNum,
     }
   }
 
-  // TODO: We should not synchronize here but pass the AsyncInfo object to the
-  //       allocate/deallocate device APIs.
-  //
-  // We need to synchronize before deallocating data.
-  Ret = AsyncInfo.synchronize();
-  if (Ret != OFFLOAD_SUCCESS)
-    return OFFLOAD_FAIL;
-
-  // Deallocate target pointer
-  for (PostProcessingInfo &Info : PostProcessingPtrs) {
-    // If we marked the entry to be deleted we need to verify no other thread
-    // reused it by now. If deletion is still supposed to happen by this thread
-    // LR will be set and exclusive access to the HDTT map will avoid another
-    // thread reusing the entry now. Note that we do not request (exclusive)
-    // access to the HDTT map if Info.DelEntry is not set.
-    LookupResult LR;
-    DeviceTy::HDTTMapAccessorTy HDTTMap =
-        Device.HostDataToTargetMap.getExclusiveAccessor(!Info.DelEntry);
-
-    if (Info.DelEntry) {
-      LR = Device.lookupMapping(HDTTMap, Info.HstPtrBegin, Info.DataSize);
-      if (LR.Entry->getTotalRefCount() != 0 ||
-          LR.Entry->getDeleteThreadId() != std::this_thread::get_id()) {
-        // The thread is not in charge of deletion anymore. Give up access to
-        // the HDTT map and unset the deletion flag.
-        HDTTMap.destroy();
-        Info.DelEntry = false;
-      }
-    }
-
-    // If we copied back to the host a struct/array containing pointers, we
-    // need to restore the original host pointer values from their shadow
-    // copies. If the struct is going to be deallocated, remove any remaining
-    // shadow pointer entries for this struct.
-    auto CB = [&](ShadowPtrListTy::iterator &Itr) {
-      // If we copied the struct to the host, we need to restore the pointer.
-      if (Info.ArgType & OMP_TGT_MAPTYPE_FROM) {
-        void **ShadowHstPtrAddr = (void **)Itr->first;
-        *ShadowHstPtrAddr = Itr->second.HstPtrVal;
-        DP("Restoring original host pointer value " DPxMOD " for host "
-           "pointer " DPxMOD "\n",
-           DPxPTR(Itr->second.HstPtrVal), DPxPTR(ShadowHstPtrAddr));
-      }
-      // If the struct is to be deallocated, remove the shadow entry.
-      if (Info.DelEntry) {
-        DP("Removing shadow pointer " DPxMOD "\n", DPxPTR((void **)Itr->first));
-        auto OldItr = Itr;
-        Itr++;
-        Device.ShadowPtrMap.erase(OldItr);
-      } else {
-        ++Itr;
-      }
-      return OFFLOAD_SUCCESS;
-    };
-    applyToShadowMapEntries(Device, CB, Info.HstPtrBegin, Info.DataSize,
-                            Info.TPR);
-
-    // If we are deleting the entry the DataMapMtx is locked and we own the
-    // entry.
-    if (Info.DelEntry) {
-      if (!FromMapperBase || FromMapperBase != Info.HstPtrBegin)
-        Ret = Device.deallocTgtPtr(HDTTMap, LR, Info.DataSize);
-
-      if (Ret != OFFLOAD_SUCCESS) {
-        REPORT("Deallocating data from device failed.\n");
-        break;
-      }
-    }
-  }
+  // Add post-processing functions
+  // TODO: We might want to remove `mutable` in the future by not changing the
+  // captured variables somehow.
+  AsyncInfo.addPostProcessingFunction(
+      [=, Device = &Device,
+       PostProcessingPtrs = std::move(PostProcessingPtrs)]() mutable -> int {
+        return postProcessingTargetDataEnd(Device, PostProcessingPtrs,
+                                           FromMapperBase);
+      });
 
   return Ret;
 }
@@ -969,20 +1022,22 @@ static int targetDataContiguous(ident_t *Loc, DeviceTy &Device, void *ArgsBase,
       return OFFLOAD_FAIL;
     }
 
-    auto CB = [&](ShadowPtrListTy::iterator &Itr) {
-      void **ShadowHstPtrAddr = (void **)Itr->first;
-      // Wait for device-to-host memcopies for whole struct to complete,
-      // before restoring the correct host pointer.
-      if (AsyncInfo.synchronize() != OFFLOAD_SUCCESS)
-        return OFFLOAD_FAIL;
-      *ShadowHstPtrAddr = Itr->second.HstPtrVal;
-      DP("Restoring original host pointer value " DPxMOD
-         " for host pointer " DPxMOD "\n",
-         DPxPTR(Itr->second.HstPtrVal), DPxPTR(ShadowHstPtrAddr));
-      ++Itr;
+    // Wait for device-to-host memcopies for whole struct to complete,
+    // before restoring the correct host pointer.
+    AsyncInfo.addPostProcessingFunction([=, Device = &Device]() -> int {
+      auto CB = [&](ShadowPtrListTy::iterator &Itr) {
+        void **ShadowHstPtrAddr = (void **)Itr->first;
+        *ShadowHstPtrAddr = Itr->second.HstPtrVal;
+        DP("Restoring original host pointer value " DPxMOD
+           " for host pointer " DPxMOD "\n",
+           DPxPTR(Itr->second.HstPtrVal), DPxPTR(ShadowHstPtrAddr));
+        ++Itr;
+        return OFFLOAD_SUCCESS;
+      };
+      applyToShadowMapEntries(*Device, CB, HstPtrBegin, ArgSize, TPR);
+
       return OFFLOAD_SUCCESS;
-    };
-    applyToShadowMapEntries(Device, CB, HstPtrBegin, ArgSize, TPR);
+    });
   }
 
   if (ArgType & OMP_TGT_MAPTYPE_TO) {
@@ -1159,19 +1214,19 @@ class PrivateArgumentManagerTy {
   /// first-private arguments and transfer them all at once.
   struct FirstPrivateArgInfoTy {
     /// The index of the element in \p TgtArgs corresponding to the argument
-    const int Index;
+    int Index;
     /// Host pointer begin
-    const char *HstPtrBegin;
+    char *HstPtrBegin;
     /// Host pointer end
-    const char *HstPtrEnd;
+    char *HstPtrEnd;
     /// Aligned size
-    const int64_t AlignedSize;
+    int64_t AlignedSize;
     /// Host pointer name
-    const map_var_info_t HstPtrName = nullptr;
+    map_var_info_t HstPtrName = nullptr;
 
-    FirstPrivateArgInfoTy(int Index, const void *HstPtr, int64_t Size,
+    FirstPrivateArgInfoTy(int Index, void *HstPtr, int64_t Size,
                           const map_var_info_t HstPtrName = nullptr)
-        : Index(Index), HstPtrBegin(reinterpret_cast<const char *>(HstPtr)),
+        : Index(Index), HstPtrBegin(reinterpret_cast<char *>(HstPtr)),
           HstPtrEnd(HstPtrBegin + Size), AlignedSize(Size + Size % Alignment),
           HstPtrName(HstPtrName) {}
   };
@@ -1473,12 +1528,19 @@ static int processDataAfter(ident_t *Loc, int64_t DeviceId, void *HostPtr,
     return OFFLOAD_FAIL;
   }
 
-  // Free target memory for private arguments
-  Ret = PrivateArgumentManager.free();
-  if (Ret != OFFLOAD_SUCCESS) {
-    REPORT("Failed to deallocate target memory for private args\n");
-    return OFFLOAD_FAIL;
-  }
+  // Free target memory for private arguments after synchronization.
+  // TODO: We might want to remove `mutable` in the future by not changing the
+  // captured variables somehow.
+  AsyncInfo.addPostProcessingFunction(
+      [PrivateArgumentManager =
+           std::move(PrivateArgumentManager)]() mutable -> int {
+        int Ret = PrivateArgumentManager.free();
+        if (Ret != OFFLOAD_SUCCESS) {
+          REPORT("Failed to deallocate target memory for private args\n");
+          return OFFLOAD_FAIL;
+        }
+        return Ret;
+      });
 
   return OFFLOAD_SUCCESS;
 }
@@ -1530,7 +1592,7 @@ int target(ident_t *Loc, DeviceTy &Device, void *HostPtr, int32_t ArgNum,
 
   PrivateArgumentManagerTy PrivateArgumentManager(Device, AsyncInfo);
 
-  int Ret;
+  int Ret = OFFLOAD_SUCCESS;
   if (ArgNum) {
     // Process data, such as data mapping, before launching the kernel
     Ret = processDataBefore(Loc, DeviceId, HostPtr, ArgNum, ArgBases, Args,
index df2ce39..488053b 100644 (file)
@@ -117,6 +117,11 @@ void __kmpc_omp_wait_deps(ident_t *loc_ref, kmp_int32 gtid, kmp_int32 ndeps,
                           kmp_depend_info_t *dep_list, kmp_int32 ndeps_noalias,
                           kmp_depend_info_t *noalias_dep_list)
     __attribute__((weak));
+void **__kmpc_omp_get_target_async_handle_ptr(kmp_int32 gtid)
+    __attribute__((weak));
+bool __kmpc_omp_has_task_team(kmp_int32 gtid) __attribute__((weak));
+// Invalid GTID as defined by libomp; keep in sync
+#define KMP_GTID_DNE (-2)
 #ifdef __cplusplus
 }
 #endif
@@ -189,6 +194,98 @@ printKernelArguments(const ident_t *Loc, const int64_t DeviceId,
   }
 }
 
+// Wrapper for task stored async info objects.
+class TaskAsyncInfoWrapperTy {
+  const int ExecThreadID = KMP_GTID_DNE;
+  AsyncInfoTy LocalAsyncInfo;
+  AsyncInfoTy *AsyncInfo = &LocalAsyncInfo;
+  void **TaskAsyncInfoPtr = nullptr;
+
+public:
+  TaskAsyncInfoWrapperTy(DeviceTy &Device)
+      : ExecThreadID(__kmpc_global_thread_num(NULL)), LocalAsyncInfo(Device) {
+    // If we failed to acquired the current global thread id, we cannot
+    // re-enqueue the current task. Thus we should use the local blocking async
+    // info.
+    if (ExecThreadID == KMP_GTID_DNE)
+      return;
+
+    // Only tasks with an assigned task team can be re-enqueue and thus can
+    // use the non-blocking synchronization scheme. Thus we should use the local
+    // blocking async info, if we donĀ“t have one.
+    if (!__kmpc_omp_has_task_team(ExecThreadID))
+      return;
+
+    // Acquire a pointer to the AsyncInfo stored inside the current task being
+    // executed.
+    TaskAsyncInfoPtr = __kmpc_omp_get_target_async_handle_ptr(ExecThreadID);
+
+    // If we cannot acquire such pointer, fallback to using the local blocking
+    // async info.
+    if (!TaskAsyncInfoPtr)
+      return;
+
+    // When creating a new task async info, the task handle must always be
+    // invalid. We must never overwrite any task async handle and there should
+    // never be any valid handle store inside the task at this point.
+    assert((*TaskAsyncInfoPtr) == nullptr &&
+           "Task async handle is not empty when dispatching new device "
+           "operations. The handle was not cleared properly or "
+           "__tgt_target_nowait_query should have been called!");
+
+    // If no valid async handle is present, a new AsyncInfo will be allocated
+    // and stored in the current task.
+    AsyncInfo = new AsyncInfoTy(Device, AsyncInfoTy::SyncTy::NON_BLOCKING);
+    *TaskAsyncInfoPtr = (void *)AsyncInfo;
+  }
+
+  ~TaskAsyncInfoWrapperTy() {
+    // Local async info destruction is automatically handled by ~AsyncInfoTy.
+    if (AsyncInfo == &LocalAsyncInfo)
+      return;
+
+    // If the are device operations still pending, return immediately without
+    // deallocating the handle.
+    if (!AsyncInfo->isDone())
+      return;
+
+    // Delete the handle and unset it from the OpenMP task data.
+    delete AsyncInfo;
+    *TaskAsyncInfoPtr = nullptr;
+  }
+
+  operator AsyncInfoTy &() { return *AsyncInfo; }
+};
+
+// Implement exponential backoff counting.
+// Linearly increments until given maximum, exponentially decrements based on
+// given backoff factor.
+class ExponentialBackoff {
+  int64_t Count = 0;
+  const int64_t MaxCount = 0;
+  const int64_t CountThreshold = 0;
+  const float BackoffFactor = 0.0f;
+
+public:
+  ExponentialBackoff(int64_t MaxCount, int64_t CountThreshold,
+                     float BackoffFactor)
+      : MaxCount(MaxCount), CountThreshold(CountThreshold),
+        BackoffFactor(BackoffFactor) {
+    assert(MaxCount >= 0 &&
+           "ExponentialBackoff: maximum count value should be non-negative");
+    assert(CountThreshold >= 0 &&
+           "ExponentialBackoff: count threshold value should be non-negative");
+    assert(BackoffFactor >= 0 && BackoffFactor < 1 &&
+           "ExponentialBackoff: backoff factor should be in [0, 1) interval");
+  }
+
+  void increment() { Count = std::min(Count + 1, MaxCount); }
+
+  void decrement() { Count *= BackoffFactor; }
+
+  bool isAboveThreshold() const { return Count > CountThreshold; }
+};
+
 #include "llvm/Support/TimeProfiler.h"
 #define TIMESCOPE() llvm::TimeTraceScope TimeScope(__FUNCTION__)
 #define TIMESCOPE_WITH_IDENT(IDENT)                                            \
index 35b7df9..4c7f598 100644 (file)
@@ -212,6 +212,8 @@ bool RTLsTy::attemptLoadRTL(const std::string &RTLName, RTLInfoTy &RTL) {
       DynLibrary->getAddressOfSymbol("__tgt_rtl_run_target_team_region_async");
   *((void **)&RTL.synchronize) =
       DynLibrary->getAddressOfSymbol("__tgt_rtl_synchronize");
+  *((void **)&RTL.query_async) =
+      DynLibrary->getAddressOfSymbol("__tgt_rtl_query_async");
   *((void **)&RTL.data_exchange) =
       DynLibrary->getAddressOfSymbol("__tgt_rtl_data_exchange");
   *((void **)&RTL.data_exchange_async) =
index c9471a6..5982425 100644 (file)
@@ -2501,6 +2501,10 @@ typedef struct kmp_tasking_flags { /* Total struct must be exactly 32 bits */
 
 } kmp_tasking_flags_t;
 
+typedef struct kmp_target_data {
+  void *async_handle; // libomptarget async handle for task completion query
+} kmp_target_data_t;
+
 struct kmp_taskdata { /* aligned during dynamic allocation       */
   kmp_int32 td_task_id; /* id, assigned by debugger                */
   kmp_tasking_flags_t td_flags; /* task flags                              */
@@ -2543,6 +2547,7 @@ struct kmp_taskdata { /* aligned during dynamic allocation       */
 #if OMPT_SUPPORT
   ompt_task_info_t ompt_task_info;
 #endif
+  kmp_target_data_t td_target_data;
 }; // struct kmp_taskdata
 
 // Make sure padding above worked
@@ -4042,6 +4047,10 @@ KMP_EXPORT int __kmp_get_max_teams(void);
 KMP_EXPORT void __kmp_set_teams_thread_limit(int limit);
 KMP_EXPORT int __kmp_get_teams_thread_limit(void);
 
+/* Interface target task integration */
+KMP_EXPORT void **__kmpc_omp_get_target_async_handle_ptr(kmp_int32 gtid);
+KMP_EXPORT bool __kmpc_omp_has_task_team(kmp_int32 gtid);
+
 /* Lock interface routines (fast versions with gtid passed in) */
 KMP_EXPORT void __kmpc_init_lock(ident_t *loc, kmp_int32 gtid,
                                  void **user_lock);
index 1622c6a..9fc02ea 100644 (file)
@@ -21,6 +21,9 @@
 #include "ompt-specific.h"
 #endif
 
+// Declaration of synchronization function from libomptarget.
+extern "C" void __tgt_target_nowait_query(void **) KMP_WEAK_ATTRIBUTE_INTERNAL;
+
 /* forward declaration */
 static void __kmp_enable_tasking(kmp_task_team_t *task_team,
                                  kmp_info_t *this_thr);
@@ -1063,7 +1066,7 @@ static void __kmp_task_finish(kmp_int32 gtid, kmp_task_t *task,
   KMP_DEBUG_ASSERT(taskdata->td_flags.started == 1);
   KMP_DEBUG_ASSERT(taskdata->td_flags.freed == 0);
 
-  bool detach = false;
+  bool completed = true;
   if (UNLIKELY(taskdata->td_flags.detachable == TASK_DETACHABLE)) {
     if (taskdata->td_allow_completion_event.type ==
         KMP_EVENT_ALLOW_COMPLETION) {
@@ -1087,13 +1090,24 @@ static void __kmp_task_finish(kmp_int32 gtid, kmp_task_t *task,
         // __kmp_fulfill_event might free taskdata at any time from now
 
         taskdata->td_flags.proxy = TASK_PROXY; // proxify!
-        detach = true;
+        completed = false;
       }
       __kmp_release_tas_lock(&taskdata->td_allow_completion_event.lock, gtid);
     }
   }
 
-  if (!detach) {
+  // Tasks with valid target async handles must be re-enqueued.
+  if (taskdata->td_target_data.async_handle != NULL) {
+    // Note: no need to translate gtid to its shadow. If the current thread is a
+    // hidden helper one, then the gtid is already correct. Otherwise, hidden
+    // helper threads are disabled, and gtid refers to a OpenMP thread.
+    __kmpc_give_task(task, __kmp_tid_from_gtid(gtid));
+    if (KMP_HIDDEN_HELPER_THREAD(gtid))
+      __kmp_hidden_helper_worker_thread_signal();
+    completed = false;
+  }
+
+  if (completed) {
     taskdata->td_flags.complete = 1; // mark the task as completed
 
 #if OMPT_SUPPORT
@@ -1125,6 +1139,13 @@ static void __kmp_task_finish(kmp_int32 gtid, kmp_task_t *task,
     // function
     KMP_DEBUG_ASSERT(taskdata->td_flags.executing == 1);
     taskdata->td_flags.executing = 0; // suspend the finishing task
+
+    // Decrement the counter of hidden helper tasks to be executed.
+    if (taskdata->td_flags.hidden_helper) {
+      // Hidden helper tasks can only be executed by hidden helper threads.
+      KMP_ASSERT(KMP_HIDDEN_HELPER_THREAD(gtid));
+      KMP_ATOMIC_DEC(&__kmp_unexecuted_hidden_helper_tasks);
+    }
   }
 
   KA_TRACE(
@@ -1136,7 +1157,7 @@ static void __kmp_task_finish(kmp_int32 gtid, kmp_task_t *task,
   // johnmc: if an asynchronous inquiry peers into the runtime system
   // it doesn't see the freed task as the current task.
   thread->th.th_current_task = resumed_task;
-  if (!detach)
+  if (completed)
     __kmp_free_task_and_ancestors(gtid, taskdata, thread);
 
   // TODO: GEH - make sure root team implicit task is initialized properly.
@@ -1532,6 +1553,7 @@ kmp_task_t *__kmp_task_alloc(ident_t *loc_ref, kmp_int32 gtid,
       parent_task->td_taskgroup; // task inherits taskgroup from the parent task
   taskdata->td_dephash = NULL;
   taskdata->td_depnode = NULL;
+  taskdata->td_target_data.async_handle = NULL;
   if (flags->tiedness == TASK_UNTIED)
     taskdata->td_last_tied = NULL; // will be set when the task is scheduled
   else
@@ -1674,13 +1696,6 @@ static void __kmp_invoke_task(kmp_int32 gtid, kmp_task_t *task,
   }
 #endif
 
-  // Decreament the counter of hidden helper tasks to be executed
-  if (taskdata->td_flags.hidden_helper) {
-    // Hidden helper tasks can only be executed by hidden helper threads
-    KMP_ASSERT(KMP_HIDDEN_HELPER_THREAD(gtid));
-    KMP_ATOMIC_DEC(&__kmp_unexecuted_hidden_helper_tasks);
-  }
-
   // Proxy tasks are not handled by the runtime
   if (taskdata->td_flags.proxy != TASK_PROXY) {
     __kmp_task_start(gtid, task, current_task); // OMPT only if not discarded
@@ -1783,7 +1798,12 @@ static void __kmp_invoke_task(kmp_int32 gtid, kmp_task_t *task,
     KMP_FSYNC_ACQUIRED(taskdata); // acquired self (new task)
 #endif
 
-    if (task->routine != NULL) {
+    if (taskdata->td_target_data.async_handle != NULL) {
+      // If we have a valid target async handle, that means that we have already
+      // executed the task routine once. We must query for the handle completion
+      // instead of re-executing the routine.
+      __tgt_target_nowait_query(&taskdata->td_target_data.async_handle);
+    } else if (task->routine != NULL) {
 #ifdef KMP_GOMP_COMPAT
       if (taskdata->td_flags.native) {
         ((void (*)(void *))(*(task->routine)))(task->shareds);
@@ -5131,3 +5151,45 @@ void __kmpc_taskloop_5(ident_t *loc, int gtid, kmp_task_t *task, int if_val,
                  modifier, task_dup);
   KA_TRACE(20, ("__kmpc_taskloop_5(exit): T#%d\n", gtid));
 }
+
+/*!
+@ingroup TASKING
+@param gtid Global Thread ID of current thread
+@return Returns a pointer to the thread's current task async handle. If no task
+is present or gtid is invalid, returns NULL.
+
+Acqurires a pointer to the target async handle from the current task.
+*/
+void **__kmpc_omp_get_target_async_handle_ptr(kmp_int32 gtid) {
+  if (gtid == KMP_GTID_DNE)
+    return NULL;
+
+  kmp_info_t *thread = __kmp_thread_from_gtid(gtid);
+  kmp_taskdata_t *taskdata = thread->th.th_current_task;
+
+  if (!taskdata)
+    return NULL;
+
+  return &taskdata->td_target_data.async_handle;
+}
+
+/*!
+@ingroup TASKING
+@param gtid Global Thread ID of current thread
+@return Returns TRUE if the current task being executed of the given thread has
+a task team allocated to it. Otherwise, returns FALSE.
+
+Checks if the current thread has a task team.
+*/
+bool __kmpc_omp_has_task_team(kmp_int32 gtid) {
+  if (gtid == KMP_GTID_DNE)
+    return FALSE;
+
+  kmp_info_t *thread = __kmp_thread_from_gtid(gtid);
+  kmp_taskdata_t *taskdata = thread->th.th_current_task;
+
+  if (!taskdata)
+    return FALSE;
+
+  return taskdata->td_task_team != NULL;
+}