Initial import to Tizen
[profile/ivi/gstreamer-python.git] / testsuite / test_ghostpad.py
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # gst-python - Python bindings for GStreamer
5 # Copyright (C) 2002 David I. Lehn
6 # Copyright (C) 2004 Johan Dahlin
7 # Copyright (C) 2005 Edward Hervey
8 #
9 # This library is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public
11 # License as published by the Free Software Foundation; either
12 # version 2.1 of the License, or (at your option) any later version.
13 #
14 # This library is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with this library; if not, write to the Free Software
21 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
22
23 from common import gst, unittest, TestCase, pygobject_2_13
24
25 import sys
26 import gc
27 import gobject
28
29 class SrcBin(gst.Bin):
30     def prepare(self):
31         src = gst.element_factory_make('fakesrc')
32         self.add(src)
33         pad = src.get_pad("src")
34         ghostpad = gst.GhostPad("src", pad)
35         self.add_pad(ghostpad)
36 gobject.type_register(SrcBin)
37
38 class SinkBin(gst.Bin):
39     def prepare(self):
40         sink = gst.element_factory_make('fakesink')
41         self.add(sink)
42         pad = sink.get_pad("sink")
43         ghostpad = gst.GhostPad("sink", pad)
44         self.add_pad(ghostpad)
45         self.sink = sink
46
47     def connect_handoff(self, cb, *args, **kwargs):
48         self.sink.set_property('signal-handoffs', True)
49         self.sink.connect('handoff', cb, *args, **kwargs)
50         
51 gobject.type_register(SinkBin)
52
53         
54 class PipeTest(TestCase):
55     def setUp(self):
56         gst.info("setUp")
57         TestCase.setUp(self)
58         self.pipeline = gst.Pipeline()
59         self.assertEquals(self.pipeline.__gstrefcount__, 1)
60         self.assertEquals(sys.getrefcount(self.pipeline), pygobject_2_13 and 2 or 3)
61
62         self.src = SrcBin()
63         self.src.prepare()
64         self.sink = SinkBin()
65         self.sink.prepare()
66         self.assertEquals(self.src.__gstrefcount__, 1)
67         self.assertEquals(sys.getrefcount(self.src), pygobject_2_13 and 2 or 3)
68         self.assertEquals(self.sink.__gstrefcount__, 1)
69         self.assertEquals(sys.getrefcount(self.sink), pygobject_2_13 and 2 or 3)
70         gst.info("end of SetUp")
71
72     def tearDown(self):
73         gst.info("tearDown")
74         self.assertTrue (self.pipeline.__gstrefcount__ >= 1 and self.pipeline.__gstrefcount__ <= 2)
75         self.assertEquals(sys.getrefcount(self.pipeline), pygobject_2_13 and 2 or 3)
76         self.assertEquals(self.src.__gstrefcount__, 2)
77         self.assertEquals(sys.getrefcount(self.src), pygobject_2_13 and 2 or 3)
78         self.assertEquals(self.sink.__gstrefcount__, 2)
79         self.assertEquals(sys.getrefcount(self.sink), 3)
80         gst.debug('deleting pipeline')
81         del self.pipeline
82         self.gccollect()
83
84         self.assertEquals(self.src.__gstrefcount__, 1) # parent gone
85         self.assertEquals(self.sink.__gstrefcount__, 1) # parent gone
86         self.assertEquals(sys.getrefcount(self.src), pygobject_2_13 and 2 or 3)
87         self.assertEquals(sys.getrefcount(self.sink), pygobject_2_13 and 2 or 3)
88         gst.debug('deleting src')
89         del self.src
90         self.gccollect()
91         gst.debug('deleting sink')
92         del self.sink
93         self.gccollect()
94
95         TestCase.tearDown(self)
96         
97     def testBinState(self):
98         self.pipeline.add(self.src, self.sink)
99         self.src.link(self.sink)
100         self.sink.connect_handoff(self._sink_handoff_cb)
101         self._handoffs = 0
102
103         self.assertTrue(self.pipeline.set_state(gst.STATE_PLAYING) != gst.STATE_CHANGE_FAILURE)
104         while True:
105             (ret, cur, pen) = self.pipeline.get_state()
106             if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
107                 break
108
109         while self._handoffs < 10:
110                 pass
111
112         self.assertEquals(self.pipeline.set_state(gst.STATE_NULL), gst.STATE_CHANGE_SUCCESS)
113         while True:
114             (ret, cur, pen) = self.pipeline.get_state()
115             if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
116                 break
117
118 ##     def testProbedLink(self):
119 ##         self.pipeline.add(self.src)
120 ##         pad = self.src.get_pad("src")
121         
122 ##         self.sink.connect_handoff(self._sink_handoff_cb)
123 ##         self._handoffs = 0
124
125 ##         # FIXME: adding a probe to the ghost pad does not work atm
126 ##         # id = pad.add_buffer_probe(self._src_buffer_probe_cb)
127 ##         realpad = pad.get_target()
128 ##         self._probe_id = realpad.add_buffer_probe(self._src_buffer_probe_cb)
129
130 ##         self._probed = False
131         
132 ##         while True:
133 ##             (ret, cur, pen) = self.pipeline.get_state()
134 ##             if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_PLAYING:
135 ##                 break
136
137 ##         while not self._probed:
138 ##             pass
139
140 ##         while self._handoffs < 10:
141 ##             pass
142
143 ##         self.pipeline.set_state(gst.STATE_NULL)
144 ##         while True:
145 ##             (ret, cur, pen) = self.pipeline.get_state()
146 ##             if ret == gst.STATE_CHANGE_SUCCESS and cur == gst.STATE_NULL:
147 ##                 break
148
149     def _src_buffer_probe_cb(self, pad, buffer):
150         gst.debug("received probe on pad %r" % pad)
151         self._probed = True
152         gst.debug('adding sink bin')
153         self.pipeline.add(self.sink)
154         # this seems to get rid of the warnings about pushing on an unactivated
155         # pad
156         gst.debug('setting sink state')
157         
158         # FIXME: attempt one: sync to current pending state of bin
159         (res, cur, pen) = self.pipeline.get_state(timeout=0)
160         target = pen
161         if target == gst.STATE_VOID_PENDING:
162             target = cur
163         gst.debug("setting sink state to %r" % target)
164         # FIXME: the following print can cause a lock-up; why ?
165         # print target
166         # if we don't set async, it will possibly end up in PAUSED
167         self.sink.set_state(target)
168         
169         gst.debug('linking')
170         self.src.link(self.sink)
171         gst.debug('removing buffer probe id %r' % self._probe_id)
172         pad.remove_buffer_probe(self._probe_id)
173         self._probe_id = None
174         gst.debug('done')
175
176     def _sink_handoff_cb(self, sink, buffer, pad):
177         gst.debug('received handoff on pad %r' % pad)
178         self._handoffs += 1
179
180 class TargetTest(TestCase):
181     def test_target(self):
182         src = gst.Pad("src", gst.PAD_SRC)
183
184         ghost = gst.GhostPad("ghost_src", src)
185         self.failUnless(ghost.get_target() is src)
186
187         ghost.set_target(None)
188         self.failUnless(ghost.get_target() is None)
189
190         ghost.set_target(src)
191         self.failUnless(ghost.get_target() is src)
192
193 if __name__ == "__main__":
194     unittest.main()