[Tizen/API] Implement src/sink/switch/valve handlers
authorMyungJoo Ham <myungjoo.ham@samsung.com>
Fri, 15 Mar 2019 00:44:54 +0000 (09:44 +0900)
committerMyungJoo Ham <myungjoo.ham@samsung.com>
Mon, 25 Mar 2019 06:08:32 +0000 (15:08 +0900)
Tizen-API implementation to handle
- tensor_sink
- app_src
- input_selector
- output_selector
- valve

Unit test cases and code clean/compaction commit will follow soon.

Signed-off-by: MyungJoo Ham <myungjoo.ham@samsung.com>
tizen-api/include/tizen-api-private.h
tizen-api/include/tizen-api.h
tizen-api/src/tizen-api-pipeline.c

index 54f25d9..9338813 100644 (file)
@@ -69,6 +69,8 @@ typedef struct _element {
   size_t size;
 
   GList *handles;
+  int maxid; /**< to allocate id for each handle */
+
   GMutex lock; /**< Lock for internal values */
 } element;
 
index fa1ce63..bc370f9 100644 (file)
@@ -77,16 +77,18 @@ typedef enum {
   NNS_ERROR_INVALID_PARAMETER                  = TIZEN_ERROR_INVALID_PARAMETER, /**< Invalid parameter */
   NNS_ERROR_NOT_SUPPORTED                      = TIZEN_ERROR_NOT_SUPPORTED, /**< The feature is not supported */
   NNS_ERROR_PIPELINE_FAIL                      = TIZEN_ERROR_STREAMS_PIPE, /**< Cannot create or access Gstreamer pipeline. */
+  NNS_ERROR_TRY_AGAIN                          = TIZEN_ERROR_TRY_AGAIN, /**< The pipeline is not ready, yet (not negotiated, yet) */
 } nns_error_e;
 
 /**
  * @brief Enumeration for buffer deallocation policies.
  * @since_tizen 5.5
- * @todo The list is to be filled! (NYI)
+ * @todo The list is to be filled! (NYI): "Memcpy Mode"? "GstBuffer Mode"?
  */
 typedef enum {
-  NNS_BUF_FREE_BY_NNSTREAMER                   = 0, /**< Default. Application should not deallocate this buffer. NNStreamer will deallocate when the buffer is no more needed */
-  NNS_BUF_DO_NOT_FREE                          = 1, /**< This buffer is not to be freed. */
+  NNS_BUF_FREE_BY_NNSTREAMER,  /**< Default. Application should not deallocate this buffer. NNStreamer will deallocate when the buffer is no more needed */
+  NNS_BUF_DO_NOT_FREE1,                /**< This buffer is not to be freed by NNStreamer (i.e., it's a static object). However, be careful: NNStreamer might be accessing this object after the return of the API call. */
+  NNS_BUF_POLICY_MAX,
 } nns_buf_policy;
 
 /**
@@ -137,7 +139,7 @@ typedef enum {
  * @detail If an application wants to accept data outputs of an nnstreamer stream, use this callback to get data from the stream. Note that the buffer may be deallocated after the return and this is synchronously called. Thus, if you need the data afterwards, copy the data to another buffer and return fast. Do not hold too much time in the callback. It is recommended to use very small tensors at sinks.
  * @since_tizen 5.5
  * @param[in] buf The contents of the tensor output (a single frame. tensor/tensors). Number of buf is determined by tensorsinfo->num_tensors.
- * @param[in] size The size of the buffer. Number of size is determined by tensorsinfo->num_tensors.
+ * @param[in] size The size of the buffer. Number of size is determined by tensorsinfo->num_tensors. Note that max num_tensors is 16 (NNS_TENSOR_SIZE_LIMIT).
  * @param[in] tensorsinfo The cardinality, dimension, and type of given tensor/tensors.
  * @param[in/out] pdata User Application's Private Data
  */
@@ -223,6 +225,9 @@ int nns_pipeline_stop (nns_pipeline_h pipe);
  * @param[in] pdata Private data for the callback. This value is passed to the callback when it's invoked.
  * @return @c 0 on success. otherwise a negative error value
  * @retval #NNS_ERROR_NONE Successful
+ * @retval #NNS_ERROR_INVALID_PARAMETER Given parameter is invalid. (pipe is NULL, sinkname is not found, or sinkname has an invalid type.)
+ *
+ * @todo Allow to use GstBuffer instead of buf/size pairs of cb, probably with yet another API.
  */
 int nns_pipeline_sink_register
 (nns_pipeline_h pipe, const char *sinkname, nns_sink_cb cb, nns_sink_h *h, void *pdata);
@@ -233,6 +238,7 @@ int nns_pipeline_sink_register
  * @param[in] The nns_sink_handle to be unregistered (destroyed)
  * @return @c 0 on success. otherwise a negative error value
  * @retval #NNS_ERROR_NONE Successful
+ * @retval #NNS_ERROR_INVALID_PARAMETER Given parameter is invalid.
  */
 int nns_pipeline_sink_unregister (nns_sink_h h);
 
@@ -245,6 +251,7 @@ int nns_pipeline_sink_unregister (nns_sink_h h);
  * @param[out] h The src handle.
  * @return 0 on success (buf is filled). otherwise a negative error value.
  * @retval #NNS_ERROR_NONE Successful
+ * @retval #NNS_ERROR_INVALID_PARAMETER Given parameter is invalid.
  */
 int nns_pipeline_src_gethandle
 (nns_pipeline_h pipe, const char *srcname, GstTensorsInfo *tensorsinfo, nns_src_h *h);
@@ -255,6 +262,7 @@ int nns_pipeline_src_gethandle
  * @param[in] h The src handle to be put (closed).
  * @return 0 on success (buf is filled). otherwise a negative error value.
  * @retval #NNS_ERROR_NONE Successful
+ * @retval #NNS_ERROR_INVALID_PARAMETER Given parameter is invalid.
  */
 int nns_pipeline_src_puthandle (nns_src_h h);
 
@@ -262,13 +270,18 @@ int nns_pipeline_src_puthandle (nns_src_h h);
  * @brief Put an input data frame.
  * @param[in] h The nns_src_handle returned by nns_pipeline_gethandle().
  * @param[out] policy The policy of buf deallocation.
- * @param[in] buf The input buffer, in the format of tensorsinfo given by nns_pipeline_gethandle()
- * @param[in] size The size of input buffer. This must be consistent with the given tensorsinfo.
+ * @param[in] buf The input buffers, in the format of tensorsinfo given by nns_pipeline_gethandle()
+ * @param[in] size The sizes of input buffers. This must be consistent with the given tensorsinfo, probed by nns_pipeline_src_gethandle().
+ * @param[in] num_tensors The number of tensors (number of buf and number of size) for the input frame. This must be consistent with the given tensorinfo, probed by nns_pipeline_src_gethandle(). MAX is 16 (NNS_TENSOR_SIZE_LIMIT).
  * @return 0 on success (buf is filled). otherwise a negative error value.
  * @retval #NNS_ERROR_NONE Successful
+ * @retval #NNS_ERROR_INVALID_PARAMETER Given parameter is invalid.
+ * @retval #NNS_ERROR_PIPELINE_FAIL The pipeline has inconsistent padcaps. Not negotiated?
+ *
+ * @todo Allow to use GstBuffer instead of buf/size pairs, probably with yet another API.
  */
 int nns_pipeline_src_inputdata (nns_src_h h,
-    nns_buf_policy policy, char *buf, size_t size);
+    nns_buf_policy policy, char *buf[], const size_t size[], unsigned int num_tensors);
 
 /****************************************************
  ** NNStreamer Pipeline Switch/Valve Control       **
@@ -280,14 +293,13 @@ int nns_pipeline_src_inputdata (nns_src_h h,
  *         Refer to https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-plugins/html/gstreamer-plugins-output-selector.html for output selectors
  * @param[in] pipe The pipeline to be managed.
  * @param[in] switchname The name of switch (InputSelector/OutputSelector)
- * @param[out] num_nodes The number of nodes selected by this switch node. We assume they are listed from 0 ... num_nodes-1.
- * @param[in] type The type of the switch.
+ * @param[out] type The type of the switch. If NULL, it is ignored.
  * @param[out] h The switch handle.
  * @return 0 on success (buf is filled). otherwise a negative error value.
  * @retval #NNS_ERROR_NONE Successful
  */
 int nns_pipeline_switch_gethandle
-(nns_pipeline_h pipe, const char *switchname, int *num_nodes, nns_switch_type type, nns_switch_h *h);
+(nns_pipeline_h pipe, const char *switchname, nns_switch_type *type, nns_switch_h *h);
 
 /**
  * @brief Close the given switch handle
@@ -300,11 +312,20 @@ int nns_pipeline_switch_puthandle (nns_switch_h h);
 /**
  * @brief Control the switch with the given handle to select input/output nodes(pads).
  * @param[in] h The switch handle returned by nns_pipeline_switch_gethandle()
- * @param[in] node The node number, which is 0 ... num_nodes-1.
+ * @param[in] padname The name of the chosen pad to be activated. Use nns_pipeline_switch_nodelist to list the available pad names.
+ * @return @c 0 on success. otherwise a negative error value
+ * @retval #NNS_ERROR_NONE Successful
+ */
+int nns_pipeline_switch_select (nns_switch_h h, const char *padname);
+
+/**
+ * @brief Get the pad names of a switch
+ * @param[in] h The switch handle returned by nns_pipeline_switch_gethandle()
+ * @param[out] list NULL terminated array of char*. The caller must free each string (char*) in the list and free the list itself.
  * @return @c 0 on success. otherwise a negative error value
  * @retval #NNS_ERROR_NONE Successful
  */
-int nns_pipeline_switch_select (nns_switch_h h, int node);
+int nns_pipeline_switch_nodelist (nns_switch_h h, char *** list);
 
 /**
  * @brief Get a handle to operate a "GstValve" node of nnstreamer pipelines
@@ -329,11 +350,11 @@ int nns_pipeline_valve_puthandle (nns_valve_h h);
 /**
  * @brief Control the valve with the given handle
  * @param[in] h The valve handle returned by nns_pipeline_valve_gethandle()
- * @param[in] valve_open 0 to close (stop flow). 1 to open (continue flow)
+ * @param[in] valve_drop 1 to close (drop & stop the flow). 0 to open (let the flow pass)
  * @return @c 0 on success. otherwise a negative error value
  * @retval #NNS_ERROR_NONE Successful
  */
-int nns_pipeline_valve_control (nns_valve_h h, int valve_open);
+int nns_pipeline_valve_control (nns_valve_h h, int valve_drop);
 
 /**
  * @}
index 73d97ab..11ab97c 100644 (file)
@@ -31,6 +31,9 @@
 #include <nnstreamer/tensor_typedef.h>
 #include <nnstreamer/nnstreamer_plugin_api.h>
 
+#include <gst/gstbuffer.h>
+#include <gst/app/app.h>        /* To push data to pipeline */
+
 /**
  * @brief Internal function to create a refereable element in a pipeline
  */
@@ -48,6 +51,7 @@ construct_element (GstElement * e, nns_pipeline * p, const char *name,
   ret->sink = NULL;
   gst_tensors_info_init (&ret->tensorsinfo);
   ret->size = 0;
+  ret->maxid = 0;
   g_mutex_init (&ret->lock);
   return ret;
 }
@@ -116,25 +120,52 @@ cb_sink_event (GstElement * e, GstBuffer * b, gpointer data)
           memcpy (&elem->tensorsinfo, &tconfig.info, sizeof (GstTensorsInfo));
           elem->size = 0;
 
-          g_assert (elem->tensorsinfo.num_tensors == num_mems);
+          if (elem->tensorsinfo.num_tensors != num_mems) {
+            dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+                "The sink event of [%s] cannot be handled because the tensor type mismatches.",
+                elem->name);
+            gst_caps_unref (caps);
+            elem->sink = NULL;
+            g_mutex_unlock (&elem->lock);
+
+            return;
+          }
 
           for (i = 0; i < elem->tensorsinfo.num_tensors; i++) {
             size_t sz = gst_tensor_info_get_size (&elem->tensorsinfo.info[i]);
 
-            g_assert (sz == size[i]);
+            if (sz != size[i]) {
+              dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+                  "The sink event of [%s] cannot be handled because the tensor dimension mismatches.",
+                  elem->name);
+              gst_caps_unref (caps);
+              elem->sink = NULL;
+              g_mutex_unlock (&elem->lock);
+
+              return;
+            }
+
             elem->size += sz;
           }
         } else {
           elem->sink = NULL;    /* It is not valid */
           /** @todo What if it keeps being "NULL"? Exception handling at 2nd frame? */
         }
+
+        gst_caps_unref (caps);
       }
     }
   }
 
-  g_assert (gst_buffer_get_size (b) == total_size);
-  if (elem->size > 0)
-    g_assert (gst_buffer_get_size (b) == elem->size);
+  /* Get the data! */
+  if (gst_buffer_get_size (b) != total_size ||
+      (elem->size > 0 && total_size != elem->size)) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The buffersize mismatches. All the three values must be the same: %zu, %zu, %zu",
+        total_size, elem->size, gst_buffer_get_size (b));
+    g_mutex_unlock (&elem->lock);
+    return;
+  }
 
   /* Iterate e->handles, pass the data to them */
   for (l = elem->handles; l != NULL; l = l->next) {
@@ -271,6 +302,8 @@ nns_pipeline_construct (const char *pipeline_description, nns_pipeline_h * pipe)
                       gst_element_factory_get_element_type (outputs))) {
                 e = construct_element (elem, pipe_h, name,
                     NNSAPI_SWITCH_OUTPUT);
+              } else {
+                /** @todo CRITICAL HANDLE THIS! */
               }
 
               if (e != NULL)
@@ -290,6 +323,7 @@ nns_pipeline_construct (const char *pipeline_description, nns_pipeline_h * pipe)
       }
     }
     g_value_unset (&item);
+    /** @todo CRITICAL check the validity of elem=item registered in e */
     gst_iterator_free (it);
 
     g_object_unref (tensor_sink);
@@ -429,9 +463,59 @@ int
 nns_pipeline_sink_register (nns_pipeline_h pipe, const char *sinkname,
     nns_sink_cb cb, nns_sink_h * h, void *pdata)
 {
-  /** @todo NYI */
+  element *elem;
+  nns_pipeline *p = pipe;
+  nns_sink *sink;
+  int ret = NNS_ERROR_NONE;
 
-  return NNS_ERROR_NONE;
+  if (pipe == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The first argument, pipeline handle is not valid.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  if (cb == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The callback argument, cb, is not valid.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  g_mutex_lock (&p->lock);
+  elem = g_hash_table_lookup (p->namednodes, sinkname);
+
+  if (elem == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "There is no element named [%s] in the pipeline.", sinkname);
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  if (elem->type != NNSAPI_SINK) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The element [%s] in the pipeline is not a tensor_sink.", sinkname);
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  *h = g_new0 (nns_sink, 1);
+  sink = *h;
+
+  sink->pipe = p;
+  sink->element = elem;
+  sink->cb = cb;
+  sink->pdata = pdata;
+
+  g_mutex_lock (&elem->lock);
+
+  elem->maxid++;
+  sink->id = elem->maxid;
+  elem->handles = g_list_append (elem->handles, sink);
+
+  g_mutex_unlock (&elem->lock);
+
+unlock_return:
+  g_mutex_unlock (&p->lock);
+  return ret;
 }
 
 /**
@@ -440,21 +524,129 @@ nns_pipeline_sink_register (nns_pipeline_h pipe, const char *sinkname,
 int
 nns_pipeline_sink_unregister (nns_sink_h h)
 {
-  /** @todo NYI */
+  nns_sink *sink = h;
+  nns_pipeline *p;
+  element *elem;
+  int ret = NNS_ERROR_NONE;
 
-  return NNS_ERROR_NONE;
+  if (h == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The handle to be unregistered is invalid");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  p = sink->pipe;
+  elem = sink->element;
+
+  if (p == NULL || elem == NULL || p != elem->pipe) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The handle appears to be broken.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  g_mutex_lock (&p->lock);
+
+  g_mutex_lock (&elem->lock);
+
+  if (NULL == g_list_find (elem->handles, sink)) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The handle does not exists.");
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  elem->handles = g_list_remove (elem->handles, sink);
+
+unlock_return:
+  g_mutex_unlock (&elem->lock);
+
+  g_mutex_unlock (&p->lock);
+
+  return ret;
 }
 
 /**
+ * @brief Implementation of policies decalred by nns_buf_policy in tizen-api.h,
+ *        "Free"
+ */
+static void
+nnsbufpolicy_free (gpointer data)
+{
+  g_free (data);
+}
+
+/**
+ * @brief Implementation of policies decalred by nns_buf_policy in tizen-api.h,
+ *        "Do Nothing"
+ */
+static void
+nnsbufpolicy_nop (gpointer data)
+{
+  /* DO NOTHING! */
+}
+
+/**
+ * @brief Implementation of policies decalred by nns_buf_policy in tizen-api.h
+ */
+static const GDestroyNotify bufpolicy[NNS_BUF_POLICY_MAX] = {
+  [NNS_BUF_FREE_BY_NNSTREAMER] = nnsbufpolicy_free,
+  [NNS_BUF_DO_NOT_FREE1] = nnsbufpolicy_nop,
+};
+
+/**
  * @brief Get a handle to operate a src (more info in tizen-api.h)
  */
 int nns_pipeline_src_gethandle
     (nns_pipeline_h pipe, const char *srcname, GstTensorsInfo * tensorsinfo,
     nns_src_h * h)
 {
-  /** @todo NYI */
+  nns_pipeline *p = pipe;
+  element *elem;
+  nns_src *src;
+  int ret = NNS_ERROR_NONE;
 
-  return NNS_ERROR_NONE;
+  if (pipe == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The first argument, pipeline handle is not valid.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  g_mutex_lock (&p->lock);
+
+  elem = g_hash_table_lookup (p->namednodes, srcname);
+
+  if (elem == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "There is no element named [%s] in the pipeline.", srcname);
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  if (elem->type != NNSAPI_SRC) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The element [%s] in the pipeline is not a tensor_src.", srcname);
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  *h = g_new (nns_src, 1);
+  src = *h;
+
+  src->pipe = p;
+  src->element = elem;
+
+  g_mutex_lock (&elem->lock);
+
+  elem->maxid++;
+  src->id = elem->maxid;
+  elem->handles = g_list_append (elem->handles, src);
+
+  g_mutex_unlock (&elem->lock);
+
+unlock_return:
+  g_mutex_unlock (&p->lock);
+
+  return ret;
 }
 
 /**
@@ -463,9 +655,45 @@ int nns_pipeline_src_gethandle
 int
 nns_pipeline_src_puthandle (nns_src_h h)
 {
-  /** @todo NYI */
+  nns_src *src = h;
+  nns_pipeline *p;
+  element *elem;
+  int ret = NNS_ERROR_NONE;
 
-  return NNS_ERROR_NONE;
+  if (h == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pieline",
+        "The handle to be put is invalid");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  p = src->pipe;
+  elem = src->element;
+
+  if (p == NULL || elem == NULL || p != elem->pipe) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The handle appears to be broken.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  g_mutex_lock (&p->lock);
+
+  g_mutex_lock (&elem->lock);
+
+  if (NULL == g_list_find (elem->handles, src)) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The handle does not exists.");
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  elem->handles = g_list_remove (elem->handles, src);
+
+unlock_return:
+  g_mutex_unlock (&elem->lock);
+
+  g_mutex_unlock (&p->lock);
+
+  return ret;
 }
 
 /**
@@ -473,11 +701,150 @@ nns_pipeline_src_puthandle (nns_src_h h)
  */
 int
 nns_pipeline_src_inputdata (nns_src_h h,
-    nns_buf_policy policy, char *buf, size_t size)
+    nns_buf_policy policy, char *buf[], const size_t size[],
+    unsigned int num_tensors)
 {
   /** @todo NYI */
+  nns_src *src = h;
+  nns_pipeline *p;
+  element *elem;
+  GstBuffer *buffer;
+  GstFlowReturn gret;
+  int ret = NNS_ERROR_NONE;
+  unsigned int i;
 
-  return NNS_ERROR_NONE;
+  if (h == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pieline",
+        "The handle to be put is invalid");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  if (num_tensors < 1 || num_tensors > NNS_TENSOR_SIZE_LIMIT) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pieline",
+        "The tensor size if invalid. It should be 1 ~ %u; where it is %u",
+        NNS_TENSOR_SIZE_LIMIT, num_tensors);
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  p = src->pipe;
+  elem = src->element;
+
+  if (p == NULL || elem == NULL || p != elem->pipe) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The handle appears to be broken.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  g_mutex_lock (&p->lock);
+
+  g_mutex_lock (&elem->lock);
+
+  /** @todo This assumes that padcap is static */
+  if (elem->src == NULL) {
+    /* Get the src-pad-cap */
+    elem->src = gst_element_get_static_pad (elem->element, "src");
+
+    if (elem->src) {
+      /* srcpadcap available (negoticated) */
+      GstCaps *caps = gst_pad_get_current_caps (elem->src);
+
+      if (caps) {
+        guint n_caps = gst_caps_get_size (caps);
+        GstTensorsConfig tconfig;
+        gboolean found = FALSE;
+
+        for (i = 0; i < n_caps; i++) {
+          GstStructure *s = gst_caps_get_structure (caps, i);
+
+          found = gst_tensors_config_from_structure (&tconfig, s);
+          if (found)
+            break;
+        }
+
+        if (found) {
+          memcpy (&elem->tensorsinfo, &tconfig.info, sizeof (GstTensorsInfo));
+          elem->size = 0;
+
+          if (elem->tensorsinfo.num_tensors != num_tensors) {
+            dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+                "The src push of [%s] cannot be handled because the number of tensors in a frame mismatches. %u != %u",
+                elem->name, elem->tensorsinfo.num_tensors, num_tensors);
+            gst_caps_unref (caps);
+            elem->sink = NULL;
+            ret = NNS_ERROR_PIPELINE_FAIL;
+            goto unlock_return;
+          }
+
+          for (i = 0; i < elem->tensorsinfo.num_tensors; i++) {
+            size_t sz = gst_tensor_info_get_size (&elem->tensorsinfo.info[i]);
+
+            if (sz != size[i]) {
+              dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+                  "The sink event of [%s] cannot be handled because the tensor dimension mismatches.",
+                  elem->name);
+              gst_caps_unref (caps);
+              elem->sink = NULL;
+              ret = NNS_ERROR_INVALID_PARAMETER;
+              goto unlock_return;
+            }
+
+            elem->size += sz;
+
+            if (sz != size[i]) {
+              dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+                  "The given input tensor size (%d'th, %zu bytes) mismatches the source pad (%zu bytes)",
+                  i, size[i], sz);
+              gst_caps_unref (caps);
+              elem->sink = NULL;
+              ret = NNS_ERROR_INVALID_PARAMETER;
+              goto unlock_return;
+            }
+          }
+        } else {
+          elem->src = NULL;     /* invalid! */
+          /** @todo What if it keeps being "NULL"? */
+        }
+
+        gst_caps_unref (caps);
+      }
+    }
+  }
+
+  if (elem->size == 0) {
+    dlog_print (DLOG_WARN, "nnstreamer-capi-pipeline",
+        "The pipeline is not ready to accept inputs. The input is ignored.");
+    ret = NNS_ERROR_TRY_AGAIN;
+    goto unlock_return;
+  }
+
+  /* Create buffer to be pushed from buf[] */
+  buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY,
+      buf[0], size[0], 0, size[0], buf[0], bufpolicy[policy]);
+  for (i = 1; i < num_tensors; i++) {
+    GstBuffer *addbuffer =
+        gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, buf[i], size[i],
+        0, size[i], buf[i], bufpolicy[policy]);
+    buffer = gst_buffer_append (buffer, addbuffer);
+
+    /** @todo Verify that gst_buffer_append lists tensors/gstmem in the correct order */
+  }
+
+  /* Push the data! */
+  gret = gst_app_src_push_buffer (GST_APP_SRC (elem->element), buffer);
+
+  if (gret == GST_FLOW_FLUSHING) {
+    dlog_print (DLOG_WARN, "nnstreamer-capi-pipeline",
+        "The pipeline is not in PAUSED/PLAYING. The input may be ignored.");
+    ret = NNS_ERROR_TRY_AGAIN;
+  } else if (gret == GST_FLOW_EOS) {
+    dlog_print (DLOG_WARN, "nnstreamer-capi-pipeline",
+        "THe pipeline is in EOS state. The input is ignored.");
+    ret = NNS_ERROR_PIPELINE_FAIL;
+  }
+unlock_return:
+  g_mutex_unlock (&elem->lock);
+  g_mutex_unlock (&p->lock);
+  return ret;
 }
 
 /****************************************************
@@ -487,35 +854,251 @@ nns_pipeline_src_inputdata (nns_src_h h,
 /**
  * @brief Get a handle to operate a selector (more info in tizen-api.h)
  */
-int nns_pipeline_switch_gethandle
-    (nns_pipeline_h pipe, const char *switchname, int *num_nodes,
-    nns_switch_type type, nns_switch_h * h)
+int
+nns_pipeline_switch_gethandle (nns_pipeline_h pipe, const char *switchname,
+    nns_switch_type type, nns_switch_h * h)
 {
-  /** @todo NYI */
+  element *elem;
+  nns_pipeline *p = pipe;
+  nns_switch *swtc;
+  int ret = NNS_ERROR_NONE;
 
-  return NNS_ERROR_NONE;
+  if (pipe == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The first argument, pipeline handle, is not valid.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  if (switchname == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The second argument, switchname, is not valid.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  g_mutex_lock (&p->lock);
+  elem = g_hash_table_lookup (p->namednodes, switchname);
+
+  if (elem == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "There is no switch element named [%s] in the pipeline.", switchname);
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  if (elem->type != NNSAPI_SWITCH_INPUT && elem->type != NNSAPI_SWITCH_OUTPUT) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "There is an element named [%s] in the pipeline, but it is not an input/output switch",
+        switchname);
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  *h = g_new0 (nns_switch, 1);
+  swtc = *h;
+
+  swtc->pipe = p;
+  swtc->element = elem;
+
+  if (type) {
+    if (elem->type == NNSAPI_SWITCH_INPUT)
+      *type = NNS_SWITCH_INPUTSELECTOR;
+    else if (elem->type == NNSAPI_SWITCH_OUTPUT)
+      *type = NNS_SWITCH_OUTPUTSELECTOR;
+    else {
+      dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+          "Internal data of switch-handle [%s] is broken. It is fatal.",
+          elem->name);
+      ret = NNS_ERROR_INVALID_PARAMETER;
+      goto unlock_return;
+    }
+  }
+
+  g_mutex_lock (&elem->lock);
+
+  elem->maxid++;
+  swtc->id = elem->maxid;
+  elem->handles = g_list_append (elem->handles, swtc);
+
+  g_mutex_unlock (&elem->lock);
+
+unlock_return:
+  g_mutex_unlock (&p->lock);
+  return ret;
 }
 
+#define handle_init(type, name, h) \
+  nns_##type *name= (h); \
+  nns_pipeline *p; \
+  element *elem; \
+  int ret = NNS_ERROR_NONE; \
+  if (h == NULL) { \
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline", \
+        "The handle to be unregistered is invalid"); \
+    return NNS_ERROR_INVALID_PARAMETER; \
+  } \
+\
+  p = name->pipe; \
+  elem = name->element; \
+  if (p == NULL || elem == NULL || p != elem->pipe) { \
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline", \
+        "The handle appears to be broken."); \
+    return NNS_ERROR_INVALID_PARAMETER; \
+  } \
+\
+  g_mutex_lock (&p->lock); \
+  g_mutex_lock (&elem->lock); \
+\
+  if (NULL == g_list_find (elem->handles, name)) { \
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline", \
+        "The handle does not exists."); \
+    ret = NNS_ERROR_INVALID_PARAMETER; \
+    goto unlock_return; \
+  }
+
+#define handle_exit(h) \
+unlock_return: \
+  g_mutex_unlock (&elem->lock); \
+  g_mutex_unlock (&p->lock); \
+  return ret;
+
 /**
  * @brief Close the given switch handle (more info in tizen-api.h)
  */
 int
 nns_pipeline_switch_puthandle (nns_switch_h h)
 {
-  /** @todo NYI */
+  handle_init (switch, swtc, h);
 
-  return NNS_ERROR_NONE;
+  elem->handles = g_list_remove (elem->handles, swtc);
+
+  handle_exit (h);
 }
 
 /**
  * @brief Control the switch (more info in tizen-api.h)
  */
 int
-nns_pipeline_switch_select (nns_switch_h h, int node)
+nns_pipeline_switch_select (nns_switch_h h, const char *padname)
 {
-  /** @todo NYI */
+  GstPad *active_pad, *new_pad;
+  gchar *active_name;
 
-  return NNS_ERROR_NONE;
+  handle_init (switch, swtc, h);
+
+  g_object_get (G_OBJECT (elem->element), "active-pad", &active_pad, NULL);
+  active_name = gst_pad_get_name (active_pad);
+
+  if (!g_strcmp0 (padname, active_name)) {
+    dlog_print (DLOG_INFO, "nnstreamer-capi-pipeline",
+        "Switch is called, but there is no effective changes: %s->%s.",
+        active_name, padname);
+    g_free (active_name);
+    gst_object_unref (active_pad);
+
+    goto unlock_return;
+  }
+
+  g_free (active_name);
+  gst_object_unref (active_pad);
+
+  new_pad = gst_element_get_static_pad (elem->element, padname);
+  if (new_pad == NULL) {
+    /* Not Found! */
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "Cannot find the pad, [%s], from the switch, [%s].",
+        padname, elem->name);
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  g_object_set (G_OBJECT (elem->element), "active-pad", new_pad, NULL);
+  gst_object_unref (new_pad);
+
+  dlog_print (DLOG_INFO, "nnstreamer-capi-pipeline",
+      "Switched to [%s] successfully at switch [%s].", padname, elem->name);
+
+  handle_exit (h);
+}
+
+/**
+ * @brief List nodes of a switch (more info in tizen-api.h)
+ */
+int
+nns_pipeline_switch_nodelist (nns_switch_h h, char ***list)
+{
+  GstIterator *it;
+  GValue item = G_VALUE_INIT;
+  gboolean done = FALSE;
+  GList *dllist = NULL;
+  GstPad *pad;
+  int counter = 0;
+
+  handle_init (switch, swtc, h);
+
+  if (elem->type == NNSAPI_SWITCH_INPUT)
+    it = gst_element_iterate_sink_pads (elem->element);
+  else if (elem->type == NNSAPI_SWITCH_OUTPUT)
+    it = gst_element_iterate_src_pads (elem->element);
+  else {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The element, [%s], is supposed to be input/output switch, but it is not. Internal data structure is broken.",
+        elem->name);
+    ret = NNS_ERROR_PIPELINE_FAIL;
+    goto unlock_return;
+  }
+
+  while (!done) {
+    switch (gst_iterator_next (it, &item)) {
+      case GST_ITERATOR_OK:
+        pad = GST_PAD (g_value_get_object (&item));
+        dllist = g_list_append (dllist, gst_pad_get_name (pad));
+        counter++;
+        g_value_reset (&item);
+        break;
+      case GST_ITERATOR_RESYNC:
+        g_list_free_full (dllist, g_free);      /* This frees all strings as well */
+        dllist = NULL;
+        counter = 0;
+        gst_iterator_resync (it);
+        break;
+      case GST_ITERATOR_ERROR:
+        dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+            "Cannot access the list of pad properly of a switch, [%s].",
+            elem->name);
+        ret = NNS_ERROR_PIPELINE_FAIL;
+        break;
+      case GST_ITERATOR_DONE:
+        done = TRUE;
+        break;
+    }
+  }
+
+  /* There has been no error with that "while" loop. */
+  if (ret == NNS_ERROR_NONE) {
+    int i = 0;
+    GList *l;
+
+    *list = g_malloc0 (sizeof (char *) * (counter + 1));
+
+    for (l = dllist; l != NULL; l = l->next) {
+      (*list)[i] = l->data;     /* Allocated by gst_pad_get_name(). Caller has to free it */
+      i++;
+
+      if (i > counter) {
+        g_list_free_full (dllist, g_free);      /* This frees all strings as well */
+        g_free (list);
+
+        dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+            "Internal data inconsistency. This could be a bug in nnstreamer. Switch [%s].",
+            elem->name);
+        ret = NNS_ERROR_PIPELINE_FAIL;
+        goto unlock_return;
+      }
+    }
+  }
+  g_list_free (dllist);         /* This does not free the strings.. fortunately. */
+
+  handle_exit (h);
 }
 
 /**
@@ -524,9 +1107,58 @@ nns_pipeline_switch_select (nns_switch_h h, int node)
 int nns_pipeline_valve_gethandle
     (nns_pipeline_h pipe, const char *valvename, nns_valve_h * h)
 {
-  /** @todo NYI */
+  element *elem;
+  nns_pipeline *p = pipe;
+  nns_valve *valve;
+  int ret = NNS_ERROR_NONE;
 
-  return NNS_ERROR_NONE;
+  if (pipe == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The first argument, pipeline handle, is not valid.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  if (valvename == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "The second argument, valvename, is not valid.");
+    return NNS_ERROR_INVALID_PARAMETER;
+  }
+
+  g_mutex_lock (&p->lock);
+  elem = g_hash_table_lookup (p->namednodes, valvename);
+
+  if (elem == NULL) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "There is no valve element named [%s] in the pipeline.", valvename);
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  if (elem->type != NNSAPI_VALVE) {
+    dlog_print (DLOG_ERROR, "nnstreamer-capi-pipeline",
+        "There is an element named [%s] in the pipeline, but it is not a valve",
+        valvename);
+    ret = NNS_ERROR_INVALID_PARAMETER;
+    goto unlock_return;
+  }
+
+  *h = g_new0 (nns_valve, 1);
+  valve = *h;
+
+  valve->pipe = p;
+  valve->element = elem;
+
+  g_mutex_lock (&elem->lock);
+
+  elem->maxid++;
+  valve->id = elem->maxid;
+  elem->handles = g_list_append (elem->handles, valve);
+
+  g_mutex_unlock (&elem->lock);
+
+unlock_return:
+  g_mutex_unlock (&p->lock);
+  return ret;
 }
 
 /**
@@ -535,18 +1167,35 @@ int nns_pipeline_valve_gethandle
 int
 nns_pipeline_valve_puthandle (nns_valve_h h)
 {
-  /** @todo NYI */
+  handle_init (valve, valve, h);
 
-  return NNS_ERROR_NONE;
+  elem->handles = g_list_remove (elem->handles, valve);
+
+  handle_exit (h);
 }
 
 /**
  * @brief Control the valve with the given handle (more info in tizen-api.h)
  */
 int
-nns_pipeline_valve_control (nns_valve_h h, int valve_open)
+nns_pipeline_valve_control (nns_valve_h h, int valve_drop)
 {
-  /** @todo NYI */
+  gboolean drop;
+  handle_init (valve, valve, h);
 
-  return NNS_ERROR_NONE;
+  g_object_get (G_OBJECT (elem->element), "drop", &drop, NULL);
+
+  if ((valve_drop != 0) == (drop != FALSE)) {
+    /* Nothing to do */
+    dlog_print (DLOG_INFO, "nnstreamer-capi-pipeline",
+        "Valve is called, but there is no effective changes: %d->%d",
+        ! !drop, ! !valve_drop);
+    goto unlock_return;
+  }
+
+  g_object_set (G_OBJECT (elem->element), "drop", ! !valve_drop, NULL);
+  dlog_print (DLOG_INFO, "nnstreamer-capi-pipeline",
+      "Valve is changed: %d->%d", ! !drop, ! !valve_drop);
+
+  handle_exit (h);
 }