testsuite/test_event.py: add a test case for autoplugging behaviour: create a source...
authorThomas Vander Stichele <thomas@apestaart.org>
Wed, 1 Feb 2006 11:52:04 +0000 (11:52 +0000)
committerThomas Vander Stichele <thomas@apestaart.org>
Wed, 1 Feb 2006 11:52:04 +0000 (11:52 +0000)
Original commit message from CVS:

* testsuite/test_event.py:
add a test case for autoplugging behaviour:
create a source, connect probes, store new-segment event,
add element in buffer probe callback, and forward event
Currently fails due to refcounting on the stored new-segment
event

ChangeLog
testsuite/test_event.py

index a248a6b..711d5be 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,14 @@
 2006-02-01  Thomas Vander Stichele  <thomas at apestaart dot org>
 
+       * testsuite/test_event.py:
+         add a test case for autoplugging behaviour:
+         create a source, connect probes, store new-segment event,
+         add element in buffer probe callback, and forward event
+         Currently fails due to refcounting on the stored new-segment
+         event
+
+2006-02-01  Thomas Vander Stichele  <thomas at apestaart dot org>
+
        * testsuite/test_element.py:
          add another link test
 
index 3341857..ca898f4 100644 (file)
@@ -142,5 +142,85 @@ class TestEmit(TestCase):
         assert isinstance(event, gst.Event)
     
 
+class TestDelayedEventProbe(TestCase):
+    # this test:
+    # starts a pipeline with only a source
+    # adds an event probe to catch the (first) new-segment
+    # adds a buffer probe to "autoplug" and send out this event
+    def setUp(self):
+        TestCase.setUp(self)
+        self.pipeline = gst.Pipeline()
+        self.src = gst.element_factory_make('fakesrc')
+        self.src.set_property('num-buffers', 10)
+        self.pipeline.add(self.src)
+        self.srcpad = self.src.get_pad('src')
+        
+    def tearDown(self):
+        gst.debug('setting pipeline to NULL')
+        self.pipeline.set_state(gst.STATE_NULL)
+        gst.debug('set pipeline to NULL')
+        # FIXME: wait for state change thread to die
+        while self.pipeline.__gstrefcount__ > 1:
+            gst.debug('waiting for self.pipeline G rc to drop to 1')
+            time.sleep(0.1)
+        self.assertEquals(self.pipeline.__gstrefcount__, 1)
+
+    def FIXMEtestProbe(self):
+        self.srcpad.add_event_probe(self._event_probe_cb)
+        self._buffer_probe_id = self.srcpad.add_buffer_probe(
+            self._buffer_probe_cb)
+
+        self._newsegment = None
+        self._eos = None
+        self._had_buffer = False
+
+        self.pipeline.set_state(gst.STATE_PLAYING)
+
+        while not self._eos:
+            time.sleep(0.1)
+
+        # verify if our newsegment event is still around and valid
+        self.failUnless(self._newsegment)
+        self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT)
+        self.assertEquals(self._newsegment.__grefcount__, 1)
+
+        # verify if our eos event is still around and valid
+        self.failUnless(self._eos)
+        self.assertEquals(self._eos.type, gst.EVENT_EOS)
+        self.assertEquals(self._eos.__grefcount__, 1)
+    def _event_probe_cb(self, pad, event):
+        if event.type == gst.EVENT_NEWSEGMENT:
+            self._newsegment = event
+            self.assertEquals(event.__grefcount__, 3)
+            # drop the event, we're storing it for later sending
+            return False
+
+        if  event.type == gst.EVENT_EOS:
+            self._eos = event
+            # we also want fakesink to get it
+            return True
+
+        self.fail("Got an unknown event %r" % event)
+
+    def _buffer_probe_cb(self, pad, buffer):
+        self.failUnless(self._newsegment)
+
+        # fake autoplugging by now putting in a fakesink
+        sink = gst.element_factory_make('fakesink')
+        self.pipeline.add(sink)
+        self.src.link(sink)
+        sink.set_state(gst.STATE_PLAYING)
+
+        pad = sink.get_pad('sink')
+        pad.send_event(self._newsegment)
+
+        # we don't want to be called again
+        self.srcpad.remove_buffer_probe(self._buffer_probe_id)
+        
+        self._had_buffer = True
+        # now let the buffer through
+        return True
+
 if __name__ == "__main__":
     unittest.main()