2 # vi:si:et:sw=4:sts=4:ts=4
4 # gst-python - Python bindings for GStreamer
5 # Copyright (C) 2002 David I. Lehn
6 # Copyright (C) 2004 Johan Dahlin
7 # Copyright (C) 2005 Edward Hervey
9 # This library is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public
11 # License as published by the Free Software Foundation; either
12 # version 2.1 of the License, or (at your option) any later version.
14 # This library is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with this library; if not, write to the Free Software
21 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
28 from common import gst, unittest, testhelper, TestCase
30 class EventTest(TestCase):
33 self.pipeline = gst.parse_launch('fakesrc ! fakesink name=sink')
34 self.sink = self.pipeline.get_by_name('sink')
35 self.pipeline.set_state(gst.STATE_PLAYING)
38 gst.debug('setting pipeline to NULL')
39 self.pipeline.set_state(gst.STATE_NULL)
40 gst.debug('set pipeline to NULL')
41 # FIXME: wait for state change thread to die
42 while self.pipeline.__gstrefcount__ > 1:
43 gst.debug('waiting for self.pipeline G rc to drop to 1')
45 self.assertEquals(self.pipeline.__gstrefcount__, 1)
49 TestCase.tearDown(self)
51 def testEventSeek(self):
52 # this event only serves to change the rate of data transfer
53 event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
54 gst.SEEK_TYPE_NONE, 0, gst.SEEK_TYPE_NONE, 0)
55 # FIXME: but basesrc goes into an mmap/munmap spree, needs to be fixed
57 event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
58 gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0)
60 gst.debug('sending event')
61 self.sink.send_event(event)
62 gst.debug('sent event')
64 self.assertEqual(event.parse_seek(), (1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
65 gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0))
67 def testWrongEvent(self):
69 self.assertRaises(TypeError, self.sink.send_event, buffer)
71 self.assertRaises(TypeError, self.sink.send_event, number)
74 class EventFileSrcTest(TestCase):
79 self.filename = tempfile.mktemp()
80 open(self.filename, 'w').write(''.join(map(str, range(10))))
82 self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
83 self.source = self.pipeline.get_by_name('source')
84 self.sink = self.pipeline.get_by_name('sink')
85 self.sigid = self.sink.connect('handoff', self.handoff_cb)
86 self.bus = self.pipeline.get_bus()
89 self.pipeline.set_state(gst.STATE_NULL)
90 self.sink.disconnect(self.sigid)
91 if os.path.exists(self.filename):
92 os.remove(self.filename)
98 TestCase.tearDown(self)
100 def handoff_cb(self, element, buffer, pad):
101 self.handoffs.append(str(buffer))
103 def playAndIter(self):
105 self.pipeline.set_state(gst.STATE_PLAYING)
106 assert self.pipeline.set_state(gst.STATE_PLAYING)
109 if msg and msg.type == gst.MESSAGE_EOS:
111 assert self.pipeline.set_state(gst.STATE_PAUSED)
112 handoffs = self.handoffs
116 def sink_seek(self, offset, method=gst.SEEK_TYPE_SET):
117 self.sink.seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
119 gst.SEEK_TYPE_NONE, 0)
121 def testSimple(self):
122 handoffs = self.playAndIter()
123 assert handoffs == map(str, range(10))
125 def testSeekCur(self):
129 class TestEmit(TestCase):
131 object = testhelper.get_object()
132 object.connect('event', self._event_cb)
135 testhelper.emit_event(object)
137 # Then emit from Python
138 object.emit('event', gst.event_new_eos())
140 def _event_cb(self, obj, event):
141 assert isinstance(event, gst.Event)
144 class TestDelayedEventProbe(TestCase):
146 # starts a pipeline with only a source
147 # adds an event probe to catch the (first) new-segment
148 # adds a buffer probe to "autoplug" and send out this event
151 self.pipeline = gst.Pipeline()
152 self.src = gst.element_factory_make('fakesrc')
153 self.src.set_property('num-buffers', 10)
154 self.pipeline.add(self.src)
155 self.srcpad = self.src.get_pad('src')
158 gst.debug('setting pipeline to NULL')
159 self.pipeline.set_state(gst.STATE_NULL)
160 gst.debug('set pipeline to NULL')
161 # FIXME: wait for state change thread to die
162 while self.pipeline.__gstrefcount__ > 1:
163 gst.debug('waiting for self.pipeline G rc to drop to 1')
165 self.assertEquals(self.pipeline.__gstrefcount__, 1)
168 self.srcpad.add_event_probe(self._event_probe_cb)
169 self._buffer_probe_id = self.srcpad.add_buffer_probe(
170 self._buffer_probe_cb)
172 self._newsegment = None
174 self._had_buffer = False
176 self.pipeline.set_state(gst.STATE_PLAYING)
181 # verify if our newsegment event is still around and valid
182 self.failUnless(self._newsegment)
183 self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT)
184 self.assertEquals(self._newsegment.__grefcount__, 1)
186 # verify if our eos event is still around and valid
187 self.failUnless(self._eos)
188 self.assertEquals(self._eos.type, gst.EVENT_EOS)
189 self.assertEquals(self._eos.__grefcount__, 1)
191 def _event_probe_cb(self, pad, event):
192 if event.type == gst.EVENT_NEWSEGMENT:
193 self._newsegment = event
194 self.assertEquals(event.__grefcount__, 3)
195 # drop the event, we're storing it for later sending
198 if event.type == gst.EVENT_EOS:
200 # we also want fakesink to get it
203 # sinks now send Latency events upstream
204 if event.type == gst.EVENT_LATENCY:
207 self.fail("Got an unknown event %r" % event)
209 def _buffer_probe_cb(self, pad, buffer):
210 self.failUnless(self._newsegment)
212 # fake autoplugging by now putting in a fakesink
213 sink = gst.element_factory_make('fakesink')
214 self.pipeline.add(sink)
216 sink.set_state(gst.STATE_PLAYING)
218 pad = sink.get_pad('sink')
219 pad.send_event(self._newsegment)
221 # we don't want to be called again
222 self.srcpad.remove_buffer_probe(self._buffer_probe_id)
224 self._had_buffer = True
225 # now let the buffer through
228 class TestEventCreationParsing(TestCase):
230 def testEventStep(self):
231 if hasattr(gst.Event, "parse_step"):
232 e = gst.event_new_step(gst.FORMAT_TIME, 42, 1.0, True, True)
234 self.assertEquals(e.type, gst.EVENT_STEP)
236 fmt, amount, rate, flush, intermediate = e.parse_step()
237 self.assertEquals(fmt, gst.FORMAT_TIME)
238 self.assertEquals(amount, 42)
239 self.assertEquals(rate, 1.0)
240 self.assertEquals(flush, True)
241 self.assertEquals(intermediate, True)
243 if __name__ == "__main__":