2 # vi:si:et:sw=4:sts=4:ts=4
4 # gst-python - Python bindings for GStreamer
5 # Copyright (C) 2002 David I. Lehn
6 # Copyright (C) 2004 Johan Dahlin
7 # Copyright (C) 2005 Edward Hervey
9 # This library is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public
11 # License as published by the Free Software Foundation; either
12 # version 2.1 of the License, or (at your option) any later version.
14 # This library is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with this library; if not, write to the Free Software
21 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 from common import gst, unittest, TestCase, pygobject_2_13
29 class SrcBin(gst.Bin):
31 src = gst.element_factory_make('fakesrc')
33 pad = src.get_pad("src")
34 ghostpad = gst.GhostPad("src", pad)
35 self.add_pad(ghostpad)
36 gobject.type_register(SrcBin)
38 class SinkBin(gst.Bin):
40 sink = gst.element_factory_make('fakesink')
42 pad = sink.get_pad("sink")
43 ghostpad = gst.GhostPad("sink", pad)
44 self.add_pad(ghostpad)
47 def connect_handoff(self, cb, *args, **kwargs):
48 self.sink.set_property('signal-handoffs', True)
49 self.sink.connect('handoff', cb, *args, **kwargs)
51 gobject.type_register(SinkBin)
54 class PipeTest(TestCase):
58 self.pipeline = gst.Pipeline()
59 self.assertEquals(self.pipeline.__gstrefcount__, 1)
60 self.assertEquals(sys.getrefcount(self.pipeline), pygobject_2_13 and 2 or 3)
66 self.assertEquals(self.src.__gstrefcount__, 1)
67 self.assertEquals(sys.getrefcount(self.src), pygobject_2_13 and 2 or 3)
68 self.assertEquals(self.sink.__gstrefcount__, 1)
69 self.assertEquals(sys.getrefcount(self.sink), pygobject_2_13 and 2 or 3)
70 gst.info("end of SetUp")
74 self.assertTrue (self.pipeline.__gstrefcount__ >= 1 and self.pipeline.__gstrefcount__ <= 2)
75 self.assertEquals(sys.getrefcount(self.pipeline), pygobject_2_13 and 2 or 3)
76 self.assertEquals(self.src.__gstrefcount__, 2)
77 self.assertEquals(sys.getrefcount(self.src), pygobject_2_13 and 2 or 3)
78 self.assertEquals(self.sink.__gstrefcount__, 2)
79 self.assertEquals(sys.getrefcount(self.sink), 3)
80 gst.debug('deleting pipeline')
84 self.assertEquals(self.src.__gstrefcount__, 1) # parent gone
85 self.assertEquals(self.sink.__gstrefcount__, 1) # parent gone
86 self.assertEquals(sys.getrefcount(self.src), pygobject_2_13 and 2 or 3)
87 self.assertEquals(sys.getrefcount(self.sink), pygobject_2_13 and 2 or 3)
88 gst.debug('deleting src')
91 gst.debug('deleting sink')
95 TestCase.tearDown(self)
97 def testBinState(self):
98 self.pipeline.add(self.src, self.sink)
99 self.src.link(self.sink)
100 self.sink.connect_handoff(self._sink_handoff_cb)
103 self.assertTrue(self.pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_FAILURE)
105 (ret, cur, pen) = self.pipeline.get_state()
106 if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
109 while self._handoffs < 10:
112 self.assertEquals(self.pipeline.set_state(gst.STATE_NULL), gst.STATE_CHANGE_SUCCESS)
114 (ret, cur, pen) = self.pipeline.get_state()
115 if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
118 ## def testProbedLink(self):
119 ## self.pipeline.add(self.src)
120 ## pad = self.src.get_pad("src")
122 ## self.sink.connect_handoff(self._sink_handoff_cb)
123 ## self._handoffs = 0
125 ## # FIXME: adding a probe to the ghost pad does not work atm
126 ## # id = pad.add_buffer_probe(self._src_buffer_probe_cb)
127 ## realpad = pad.get_target()
128 ## self._probe_id = realpad.add_buffer_probe(self._src_buffer_probe_cb)
130 ## self._probed = False
133 ## (ret, cur, pen) = self.pipeline.get_state()
134 ## if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
137 ## while not self._probed:
140 ## while self._handoffs < 10:
143 ## self.pipeline.set_state(gst.STATE_NULL)
145 ## (ret, cur, pen) = self.pipeline.get_state()
146 ## if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
149 def _src_buffer_probe_cb(self, pad, buffer):
150 gst.debug("received probe on pad %r" % pad)
152 gst.debug('adding sink bin')
153 self.pipeline.add(self.sink)
154 # this seems to get rid of the warnings about pushing on an unactivated
156 gst.debug('setting sink state')
158 # FIXME: attempt one: sync to current pending state of bin
159 (res, cur, pen) = self.pipeline.get_state(timeout=0)
161 if target == gst.STATE_VOID_PENDING:
163 gst.debug("setting sink state to %r" % target)
164 # FIXME: the following print can cause a lock-up; why ?
166 # if we don't set async, it will possibly end up in PAUSED
167 self.sink.set_state(target)
170 self.src.link(self.sink)
171 gst.debug('removing buffer probe id %r' % self._probe_id)
172 pad.remove_buffer_probe(self._probe_id)
173 self._probe_id = None
176 def _sink_handoff_cb(self, sink, buffer, pad):
177 gst.debug('received handoff on pad %r' % pad)
180 class TargetTest(TestCase):
181 def test_target(self):
182 src = gst.Pad("src", gst.PAD_SRC)
184 ghost = gst.GhostPad("ghost_src", src)
185 self.failUnless(ghost.get_target() is src)
187 ghost.set_target(None)
188 self.failUnless(ghost.get_target() is None)
190 ghost.set_target(src)
191 self.failUnless(ghost.get_target() is src)
193 if __name__ == "__main__":