Removed not need thread and changed to push out blocksize bytes.
authorEdgard Lima <edgard.lima@indt.org.br>
Wed, 4 Jan 2006 13:26:35 +0000 (13:26 +0000)
committerEdgard Lima <edgard.lima@indt.org.br>
Wed, 4 Jan 2006 13:26:35 +0000 (13:26 +0000)
Original commit message from CVS:
Removed not need thread and changed to push out blocksize bytes.

ChangeLog
common
ext/neon/gstneonhttpsrc.c
ext/neon/gstneonhttpsrc.h

index f31a342..de44356 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
+2006-01-04  Edgard Lima <edgard.lima@indt.org.br>
+
+       * ext/neon/gstneonhttpsrc.c:
+       * ext/neon/gstneonhttpsrc.h:
+       Removed not need thread and changed to push out blocksize bytes.
+       
 2005-12-28  Edgard Lima <edgard.lima@indt.org.br>
 
        * configure.ac:
diff --git a/common b/common
index d1911d4..5f10c87 160000 (submodule)
--- a/common
+++ b/common
@@ -1 +1 @@
-Subproject commit d1911d4b3d6267f9cd9dfb68fcef2afe4d098092
+Subproject commit 5f10c872cafb3eb8058d63e438cae029ed9e8d73
index 1f9fad7..319dc26 100644 (file)
@@ -49,11 +49,8 @@ enum
   PROP_PROXY
 };
 
-static void request_dispatch (void *data);
 static void oom_callback ();
-static int accept_response (void *userdata, ne_request * req,
-    const ne_status * st);
-static void block_reader (void *userdata, const char *buf, size_t len);
+
 static void size_header_handler (void *userdata, const char *value);
 
 static gboolean set_proxy (const char *uri, ne_uri * parsed,
@@ -165,11 +162,7 @@ gst_neonhttp_src_init (GstNeonhttpSrc * this, GstNeonhttpSrcClass * g_class)
   set_uri (NULL, &this->uri, &this->ishttps, &this->uristr, TRUE);
   set_proxy (NULL, &this->proxy, TRUE);
 
-  this->lock = g_mutex_new ();
   this->adapter = gst_adapter_new ();
-  this->task = gst_task_create (request_dispatch, this);
-  g_static_rec_mutex_init (&this->tasklock);
-  gst_task_set_lock (this->task, &this->tasklock);
 
   gst_base_src_set_live (GST_BASE_SRC (this), TRUE);
 
@@ -200,22 +193,80 @@ gst_neonhttp_src_finalize (GObject * gobject)
     g_object_unref (this->adapter);
   }
 
-  if (this->lock) {
-    g_mutex_free (this->lock);
+  if (this->uristr) {
+    ne_free (this->uristr);
+  }
+
+}
+
+int
+request_dispatch (GstNeonhttpSrc * src, GstBuffer * outbuf)
+{
+
+  GstPad *peer;
+  int ret;
+  int read = 0;
+  int sizetoread = GST_BUFFER_SIZE (outbuf);
+
+  /* Loop sending the request:
+   * Retry whilst authentication fails and we supply it. */
+
+  ssize_t len = 0;
+
+  while (sizetoread > 0) {
+
+    if (!GST_OBJECT_FLAG_IS_SET (src, GST_NEONHTTP_SRC_OPEN)) {
+      GST_BUFFER_SIZE (outbuf) = read;
+      return read;
+    }
+    len = ne_read_response_block (src->request,
+        (char *) GST_BUFFER_DATA (outbuf) + read, sizetoread);
+    if (len > 0) {
+      read += len;
+      sizetoread -= len;
+    } else {
+      break;
+    }
+
+  }
+
+  GST_BUFFER_SIZE (outbuf) = read;
+
+  if (len < 0) {
+    read = -2;
+    goto done;
+  } else if (len == 0) {
+    ret = ne_end_request (src->request);
+    if (ret != NE_RETRY) {
+      if (ret == NE_OK) {
+        GST_DEBUG ("Returning EOS");
+        peer = gst_pad_get_peer (GST_BASE_SRC_PAD (src));
+        if (!gst_pad_send_event (peer, gst_event_new_eos ())) {
+          ret = GST_FLOW_ERROR;
+        }
+        gst_object_unref (peer);
+      } else {
+        read = -3;
+        GST_ERROR ("Request failed. code:%d, desc: %s\n", ret,
+            ne_get_error (src->session));
+      }
+    }
+    goto done;
   }
 
-  gst_object_unref (this->task);
+done:
 
-  g_static_rec_mutex_free (&this->tasklock);
+  return read;
 
 }
 
+
 static GstFlowReturn
 gst_neonhttp_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 {
   GstNeonhttpSrc *src;
   GstFlowReturn ret = GST_FLOW_OK;
-  guint avail;
+  int read;
 
   src = GST_NEONHTTP_SRC (psrc);
 
@@ -224,46 +275,19 @@ gst_neonhttp_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
 
   GST_LOG_OBJECT (src, "asked for a buffer");
 
-  while (1) {
-    g_mutex_lock (src->lock);
-    if ((avail = gst_adapter_available (src->adapter))) {
-      g_mutex_unlock (src->lock);
-      break;
-    } else if (src->eos) {
-      GstPad *peer;
+  *outbuf = gst_buffer_new_and_alloc (GST_BASE_SRC (psrc)->blocksize);
 
-      g_mutex_unlock (src->lock);
+  read = request_dispatch (src, *outbuf);
+  if (read > 0) {
 
-      *outbuf = NULL;
-      GST_DEBUG ("Returning EOS");
-      peer = gst_pad_get_peer (GST_BASE_SRC_PAD (src));
-      if (!gst_pad_send_event (peer, gst_event_new_eos ())) {
-        ret = GST_FLOW_ERROR;
-      }
-      gst_object_unref (peer);
-      goto done;
+    if (*outbuf) {
+      gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
     }
-    g_mutex_unlock (src->lock);
-    usleep (250000);
-  }
-
-  g_mutex_lock (src->lock);
-
-  avail = gst_adapter_available (src->adapter);
-  avail = avail > (4 * 1024) ? (4 * 1024) : avail;
-  *outbuf = gst_buffer_new_and_alloc (avail);
-  memcpy (GST_BUFFER_DATA (*outbuf), gst_adapter_peek (src->adapter, avail),
-      avail);
-  gst_adapter_flush (src->adapter, avail);
-
-  g_mutex_unlock (src->lock);
 
-  if (*outbuf) {
-    gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src)));
+  } else if (read < 0) {
+    return GST_FLOW_ERROR;
   }
 
-done:
-
   return ret;
 
 wrong_state:
@@ -516,17 +540,12 @@ gst_neonhttp_src_start (GstBaseSrc * bsrc)
   ne_add_response_header_handler (src->request, "Content-Length",
       size_header_handler, src);
 
-  ne_add_response_body_reader (src->request, accept_response, block_reader,
-      src);
-
   if (NE_OK != ne_begin_request (src->request)) {
     ret = FALSE;
     goto done;
   }
 
-  src->eos = FALSE;
   GST_OBJECT_FLAG_SET (src, GST_NEONHTTP_SRC_OPEN);
-  gst_task_start (src->task);
 
 done:
 
@@ -546,9 +565,6 @@ gst_neonhttp_src_stop (GstBaseSrc * bsrc)
 
   GST_OBJECT_FLAG_UNSET (src, GST_NEONHTTP_SRC_OPEN);
 
-  gst_task_stop (src->task);
-  gst_task_join (src->task);
-
   if (src->request) {
     ne_request_destroy (src->request);
     src->request = NULL;
@@ -563,54 +579,6 @@ gst_neonhttp_src_stop (GstBaseSrc * bsrc)
   return TRUE;
 }
 
-void
-request_dispatch (void *data)
-{
-
-  int ret;
-
-  GstNeonhttpSrc *src;
-
-  src = GST_NEONHTTP_SRC (data);
-
-  /* Loop sending the request:
-   * Retry whilst authentication fails and we supply it. */
-
-  do {
-    ssize_t len;
-
-    do {
-      if (!GST_OBJECT_FLAG_IS_SET (src, GST_NEONHTTP_SRC_OPEN)) {
-        return;
-      }
-      len = ne_read_response_block (src->request,
-          src->respbuf, sizeof (src->respbuf));
-    } while (len > 0);
-
-    if (len < 0) {
-      ret = NE_ERROR;
-      break;
-    }
-
-    ret = ne_end_request (src->request);
-
-  } while (ret == NE_RETRY);
-
-  if (ret != NE_OK) {
-    GST_ERROR ("Request failed. code:%d, desc: %s\n", ret,
-        ne_get_error (src->session));
-  }
-
-  g_mutex_lock (src->lock);
-  src->eos = TRUE;
-  g_mutex_unlock (src->lock);
-  gst_task_stop (src->task);
-
-  return;
-
-}
-
-
 /* entry point to initialize the plug-in
  * initialize the plug-in itself
  * register the element factories and pad templates
@@ -682,39 +650,11 @@ oom_callback ()
   GST_ERROR ("memory exeception in neon\n");
 }
 
-static int
-accept_response (void *userdata, ne_request * req, const ne_status * st)
-{
-  GST_LOG ("ne_accept_response called code = %d phrase %s\n", st->code,
-      st->reason_phrase);
-  return ne_accept_2xx (userdata, req, st);
-}
-
-static void
-block_reader (void *userdata, const char *buf, size_t len)
-{
-
-  if (len) {
-    GstNeonhttpSrc *src = GST_NEONHTTP_SRC (userdata);
-    GstBuffer *buffer = gst_buffer_new_and_alloc (len);
-
-    memcpy (GST_BUFFER_DATA (buffer), buf, len);
-
-    g_mutex_lock (src->lock);
-    gst_adapter_push (src->adapter, buffer);
-    src->current_size += len;
-    g_mutex_unlock (src->lock);
-
-  }
-}
-
 void
 size_header_handler (void *userdata, const char *value)
 {
   GstNeonhttpSrc *src = GST_NEONHTTP_SRC (userdata);
 
-  g_mutex_lock (src->lock);
   src->content_size = atoi (value);
-  g_mutex_unlock (src->lock);
 
 }
index 6691d63..05c6b55 100644 (file)
@@ -62,12 +62,8 @@ struct _GstNeonhttpSrc {
   gint current_size;
 
   GstAdapter *adapter;
-  GMutex *lock;
-  GstTask *task;
-  GStaticRecMutex tasklock;
-  gboolean eos;
 
-  char respbuf[BUFSIZ];
+  gboolean eos;
 
 };