filter/python3: support multi-threaded python custon filters.
authorMyungJoo Ham <myungjoo.ham@samsung.com>
Thu, 7 Jul 2022 08:04:49 +0000 (17:04 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Mon, 11 Jul 2022 08:13:27 +0000 (17:13 +0900)
Use Python GIL for multi-threaded python custom filters.
Don't use a local lock, use GIL.

Fixes #3822

Signed-off-by: MyungJoo Ham <myungjoo.ham@samsung.com>
ps. The execution result of CV2 enabled python code of #3822:

```
$ gst-launch-1.0 videotestsrc num-buffers=10 ! video/x-raw,format=RGB,width=280,height=40 ! tensor_converter ! tensor_filter framework=python3 model=p2.py input=3:280:40:1 inputtype=uint8 output=3:280:40:1 outputtype=uint8  ! tensor_decoder mode=direct_video ! videoconvert ! ximagesink
init_filter_py:844
Setting pipeline to PAUSED ...
Pipeline is PREROLLING ...
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
Pipeline is PREROLLED ...
Setting pipeline to PLAYING ...
New clock: GstSystemClock
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
--- USE_CV2:True /source/AutoDrv/NNStreamer/tests/p2.py (33600,)
Got EOS from element "pipeline0".
Execution ended after 0:00:00.333332913
Setting pipeline to PAUSED ...
Setting pipeline to READY ...
Setting pipeline to NULL ...
Freeing pipeline ...
```

ext/nnstreamer/extra/nnstreamer_python3_helper.h
ext/nnstreamer/tensor_filter/tensor_filter_python3.cc

index 21714e4..fe47ba4 100644 (file)
@@ -17,6 +17,7 @@
 
 #define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
 #include <Python.h>
+#include <patchlevel.h>
 #include <dlfcn.h>
 #include <numpy/arrayobject.h>
 #include <structmember.h>
     }                      \
   } while (0)
 
+#if (PY_MAJOR_VERSION > 3) || ((PY_MAJOR_VERSION == 3) && (PY_MINOR_VERSION >= 7))
+/* Since 3.7, PyEval_InitThreads does nothing and deprecated */
+#define PyEval_InitThreads_IfGood()     do { } while (0)
+#else
+#define PyEval_InitThreads_IfGood()     do { PyEval_InitThreads(); } while (0)
+#endif
+
 extern tensor_type getTensorType (NPY_TYPES npyType);
 extern NPY_TYPES getNumpyType (tensor_type tType);
 extern int loadScript (PyObject **core_obj, const gchar *module_name, const gchar *class_name);
index 01dfc11..96cb09b 100644 (file)
@@ -21,6 +21,7 @@
  * @see     http://github.com/nnstreamer/nnstreamer
  * @author  Dongju Chae <dongju.chae@samsung.com>
  * @bug     No known bugs except for NYI items
+ * @todo    This would be better if this inherits tensor_filter_subplugin class.
  */
 
 /**
@@ -69,6 +70,9 @@ typedef enum _cb_type {
   CB_END,
 } cb_type;
 
+#define Py_LOCK() PyGILState_Ensure()
+#define Py_UNLOCK(gstate) PyGILState_Release(gstate)
+
 /**
  * @brief      Python embedding core structure
  */
@@ -96,16 +100,6 @@ class PYCore
   {
     return callback_type;
   }
-  /** @brief Lock python-related actions */
-  void Py_LOCK ()
-  {
-    g_mutex_lock (&py_mutex);
-  }
-  /** @brief Unlock python-related actions */
-  void Py_UNLOCK ()
-  {
-    g_mutex_unlock (&py_mutex);
-  }
 
   int checkTensorType (int nns_type, int np_type);
   int checkTensorSize (GstTensorMemory *output, PyArrayObject *array);
@@ -121,7 +115,6 @@ class PYCore
 
   PyObject *core_obj;
   PyObject *shape_cls;
-  GMutex py_mutex;
 
   GstTensorsInfo inputTensorMeta; /**< The tensor info of input tensors */
   GstTensorsInfo outputTensorMeta; /**< The tensor info of output tensors */
@@ -177,8 +170,6 @@ PYCore::PYCore (const char *_script_path, const char *_custom)
   configured = false;
   shape_cls = NULL;
 
-  /** to prevent concurrent Python C-API calls */
-  g_mutex_init (&py_mutex);
 }
 
 /**
@@ -190,13 +181,14 @@ PYCore::~PYCore ()
   gst_tensors_info_free (&inputTensorMeta);
   gst_tensors_info_free (&outputTensorMeta);
 
+  PyGILState_STATE gstate = Py_LOCK ();
   Py_SAFEDECREF (core_obj);
   Py_SAFEDECREF (shape_cls);
 
   PyErr_Clear ();
+  Py_UNLOCK (gstate);
 
   dlclose (handle);
-  g_mutex_clear (&py_mutex);
 }
 
 /**
@@ -206,11 +198,13 @@ PYCore::~PYCore ()
 int
 PYCore::init (const GstTensorFilterProperties *prop)
 {
+  PyGILState_STATE gstate = Py_LOCK ();
+  int ret = -EINVAL;
   /** Find nnstreamer_api module */
   PyObject *api_module = PyImport_ImportModule ("nnstreamer_python");
   if (api_module == NULL) {
     Py_ERRMSG ("Cannt find `nnstreamer_python` module");
-    return -EINVAL;
+    goto exit;
   }
 
   shape_cls = PyObject_GetAttrString (api_module, "TensorShape");
@@ -218,13 +212,16 @@ PYCore::init (const GstTensorFilterProperties *prop)
 
   if (shape_cls == NULL) {
     Py_ERRMSG ("Failed to get `TensorShape` from `nnstreamer_python` module");
-    return -EINVAL;
+    goto exit;
   }
 
   gst_tensors_info_copy (&inputTensorMeta, &prop->input_meta);
   gst_tensors_info_copy (&outputTensorMeta, &prop->output_meta);
 
-  return loadScript ();
+  ret = loadScript ();
+exit:
+  Py_UNLOCK (gstate);
+  return ret;
 }
 
 /**
@@ -252,6 +249,9 @@ PYCore::loadScript ()
   gint64 start_time = g_get_real_time ();
 #endif
 
+  int ret = -EINVAL;
+  PyGILState_STATE gstate = Py_LOCK ();
+
   PyObject *module = PyImport_ImportModule (module_name.c_str ());
   if (module) {
     PyObject *cls = PyObject_GetAttrString (module, "CustomFilter");
@@ -267,7 +267,7 @@ PYCore::loadScript ()
         if (argc < 1) {
           g_strfreev (g_args);
           ml_loge ("Cannot load python script for python-custom-filter.\n");
-          return -EINVAL;
+          goto exit;
         }
 
         py_args = PyTuple_New (argc);
@@ -294,19 +294,22 @@ PYCore::loadScript ()
           callback_type = CB_END;
       } else {
         Py_ERRMSG ("Fail to create an instance 'CustomFilter'\n");
-        return -3;
+        ret = -3;
+        goto exit;
       }
 
       Py_SAFEDECREF (cls);
     } else {
       Py_ERRMSG ("Cannot find 'CustomFilter' class in the script\n");
-      return -2;
+      ret = -2;
+      goto exit;
     }
 
     Py_SAFEDECREF (module);
   } else {
     Py_ERRMSG ("the script is not properly loaded\n");
-    return -1;
+    ret = -1;
+    goto exit;
   }
 
   configured = true;
@@ -316,7 +319,10 @@ PYCore::loadScript ()
   g_message ("Script is loaded: %" G_GINT64_FORMAT, (stop_time - start_time));
 #endif
 
-  return 0;
+  ret = 0;
+exit:
+  Py_UNLOCK (gstate);
+  return ret;
 }
 
 /**
@@ -366,11 +372,14 @@ PYCore::checkTensorSize (GstTensorMemory *output, PyArrayObject *array)
   if (nullptr == output || nullptr == array)
     throw std::invalid_argument ("Null pointers are given to PYCore::checkTensorSize().\n");
 
+  PyGILState_STATE gstate = Py_LOCK ();
   size_t total_size = PyArray_ITEMSIZE (array);
 
   for (int i = 0; i < PyArray_NDIM (array); i++)
     total_size *= PyArray_DIM (array, i);
 
+  Py_UNLOCK (gstate);
+
   return (output->size == total_size);
 }
 
@@ -388,7 +397,7 @@ PYCore::getInputTensorDim (GstTensorsInfo *info)
   if (nullptr == info)
     throw std::invalid_argument ("A null pointer is given to PYCore::getInputTensorDim().\n");
 
-  Py_LOCK ();
+  PyGILState_STATE gstate = Py_LOCK ();
 
   PyObject *result = PyObject_CallMethod (core_obj, (char *)"getInputDim", NULL);
   if (result) {
@@ -399,7 +408,7 @@ PYCore::getInputTensorDim (GstTensorsInfo *info)
     res = -1;
   }
 
-  Py_UNLOCK ();
+  Py_UNLOCK (gstate);
 
   return res;
 }
@@ -418,7 +427,7 @@ PYCore::getOutputTensorDim (GstTensorsInfo *info)
   if (nullptr == info)
     throw std::invalid_argument ("A null pointer is given to PYCore::getOutputTensorDim().\n");
 
-  Py_LOCK ();
+  PyGILState_STATE gstate = Py_LOCK ();
 
   PyObject *result = PyObject_CallMethod (core_obj, (char *)"getOutputDim", NULL);
   if (result) {
@@ -429,7 +438,7 @@ PYCore::getOutputTensorDim (GstTensorsInfo *info)
     res = -1;
   }
 
-  Py_UNLOCK ();
+  Py_UNLOCK (gstate);
 
   return res;
 }
@@ -449,7 +458,7 @@ PYCore::setInputTensorDim (const GstTensorsInfo *in_info, GstTensorsInfo *out_in
   if (nullptr == in_info || nullptr == out_info)
     throw std::invalid_argument ("Null pointers are given to PYCore::setInputTensorDim().\n");
 
-  Py_LOCK ();
+  PyGILState_STATE gstate = Py_LOCK ();
 
   /** to Python list object */
   PyObject *param = PyList_New (in_info->num_tensors);
@@ -480,7 +489,7 @@ PYCore::setInputTensorDim (const GstTensorsInfo *in_info, GstTensorsInfo *out_in
     res = -1;
   }
 
-  Py_UNLOCK ();
+  Py_UNLOCK (gstate);
 
   return res;
 }
@@ -493,6 +502,7 @@ void
 PYCore::freeOutputTensors (void *data)
 {
   std::map<void *, PyArrayObject *>::iterator it;
+  PyGILState_STATE gstate = Py_LOCK ();
 
   it = outputArrayMap.find (data);
   if (it != outputArrayMap.end ()) {
@@ -501,6 +511,8 @@ PYCore::freeOutputTensors (void *data)
   } else {
     ml_loge ("Cannot find output data: 0x%lx", (unsigned long)data);
   }
+
+  Py_UNLOCK (gstate);
 }
 
 /**
@@ -515,6 +527,7 @@ int
 PYCore::run (const GstTensorMemory *input, GstTensorMemory *output)
 {
   int res = 0;
+  PyObject *result;
 
 #if (DBG)
   gint64 start_time = g_get_real_time ();
@@ -523,7 +536,7 @@ PYCore::run (const GstTensorMemory *input, GstTensorMemory *output)
   if (nullptr == output || nullptr == input)
     throw std::invalid_argument ("Null pointers are given to PYCore::run().\n");
 
-  Py_LOCK ();
+  PyGILState_STATE gstate = Py_LOCK ();
 
   PyObject *param = PyList_New (inputTensorMeta.num_tensors);
   for (unsigned int i = 0; i < inputTensorMeta.num_tensors; i++) {
@@ -536,8 +549,10 @@ PYCore::run (const GstTensorMemory *input, GstTensorMemory *output)
     PyList_SetItem (param, i, input_array);
   }
 
-  PyObject *result
+
+  result
       = PyObject_CallMethod (core_obj, (char *)"invoke", (char *)"(O)", param);
+
   if (result) {
     if ((unsigned int)PyList_Size (result) != outputTensorMeta.num_tensors) {
       res = -EINVAL;
@@ -571,7 +586,7 @@ PYCore::run (const GstTensorMemory *input, GstTensorMemory *output)
 
 exit_decref:
   Py_SAFEDECREF (param);
-  Py_UNLOCK ();
+  Py_UNLOCK (gstate);
 
 #if (DBG)
   gint64 stop_time = g_get_real_time ();
@@ -729,15 +744,18 @@ py_loadScriptFile (const GstTensorFilterProperties *prop, void **private_data)
   /* init null */
   *private_data = NULL;
 
+  PyGILState_STATE gstate = PyGILState_Ensure();
   core = new PYCore (script_path, prop->custom_properties);
   if (core == NULL) {
     g_printerr ("Failed to allocate memory for filter subplugin: Python\n");
+    PyGILState_Release (gstate);
     return -1;
   }
 
   if (core->init (prop) != 0) {
     delete core;
     g_printerr ("failed to initailize the object: Python\n");
+    PyGILState_Release (gstate);
     return -2;
   }
 
@@ -745,8 +763,10 @@ py_loadScriptFile (const GstTensorFilterProperties *prop, void **private_data)
   if (core->getCbType () != CB_SETDIM && core->getCbType () != CB_GETDIM) {
     delete core;
     g_printerr ("Wrong callback type\n");
+    PyGILState_Release (gstate);
     return -2;
   }
+  PyGILState_Release (gstate);
 
   *private_data = core;
 
@@ -805,6 +825,7 @@ static GstTensorFilterFramework NNS_support_python = {.version = GST_TENSOR_FILT
        .allocateInInvoke = nullptr,
    } } };
 
+static PyThreadState* st;
 /** @brief Initialize this object for tensor_filter subplugin runtime register */
 void
 init_filter_py (void)
@@ -812,6 +833,8 @@ init_filter_py (void)
   /** Python should be initialized and finalized only once */
   if (!Py_IsInitialized ())
     Py_Initialize ();
+  PyEval_InitThreads_IfGood ();
+  st = PyEval_SaveThread();
 
   nnstreamer_filter_probe (&NNS_support_python);
   nnstreamer_filter_set_custom_property_desc (filter_subplugin_python,
@@ -824,6 +847,7 @@ init_filter_py (void)
 void
 fini_filter_py (void)
 {
+  PyEval_RestoreThread (st);
   nnstreamer_filter_exit (NNS_support_python.v0.name);
 
   /** Python should be initialized and finalized only once */