Adding gst-python package
[platform/upstream/gst-python.git] / testsuite / old / test_event.py
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # gst-python - Python bindings for GStreamer
5 # Copyright (C) 2002 David I. Lehn
6 # Copyright (C) 2004 Johan Dahlin
7 # Copyright (C) 2005 Edward Hervey
8 #
9 # This library is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public
11 # License as published by the Free Software Foundation; either
12 # version 2.1 of the License, or (at your option) any later version.
13 #
14 # This library is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 # Lesser General Public License for more details.
18 #
19 # You should have received a copy of the GNU Lesser General Public
20 # License along with this library; if not, write to the Free Software
21 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
22
23 import os
24 import sys
25 import time
26 import tempfile
27
28 from common import gst, unittest, testhelper, TestCase
29
30 class EventTest(TestCase):
31     def setUp(self):
32         TestCase.setUp(self)
33         self.pipeline = gst.parse_launch('fakesrc ! fakesink name=sink')
34         self.sink = self.pipeline.get_by_name('sink')
35         self.pipeline.set_state(gst.STATE_PLAYING)
36
37     def tearDown(self):
38         gst.debug('setting pipeline to NULL')
39         self.pipeline.set_state(gst.STATE_NULL)
40         gst.debug('set pipeline to NULL')
41         # FIXME: wait for state change thread to die
42         while self.pipeline.__gstrefcount__ > 1:
43             gst.debug('waiting for self.pipeline G rc to drop to 1')
44             time.sleep(0.1)
45         self.assertEquals(self.pipeline.__gstrefcount__, 1)
46
47         del self.sink
48         del self.pipeline
49         TestCase.tearDown(self)
50         
51     def testEventSeek(self):
52         # this event only serves to change the rate of data transfer
53         event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
54             gst.SEEK_TYPE_NONE, 0, gst.SEEK_TYPE_NONE, 0)
55         # FIXME: but basesrc goes into an mmap/munmap spree, needs to be fixed
56
57         event = gst.event_new_seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
58             gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0)
59         assert event
60         gst.debug('sending event')
61         self.sink.send_event(event)
62         gst.debug('sent event')
63
64         self.assertEqual(event.parse_seek(), (1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
65             gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_NONE, 0))
66
67     def testWrongEvent(self):
68         buffer = gst.Buffer()
69         self.assertRaises(TypeError, self.sink.send_event, buffer)
70         number = 1
71         self.assertRaises(TypeError, self.sink.send_event, number)
72
73
74 class EventFileSrcTest(TestCase):
75
76    def setUp(self):
77        TestCase.setUp(self)
78        gst.info("start")
79        self.filename = tempfile.mktemp()
80        open(self.filename, 'w').write(''.join(map(str, range(10))))
81        
82        self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
83        self.source = self.pipeline.get_by_name('source')
84        self.sink = self.pipeline.get_by_name('sink')
85        self.sigid = self.sink.connect('handoff', self.handoff_cb)
86        self.bus = self.pipeline.get_bus()
87        
88    def tearDown(self):
89        self.pipeline.set_state(gst.STATE_NULL)
90        self.sink.disconnect(self.sigid)
91        if os.path.exists(self.filename):
92            os.remove(self.filename)
93        del self.bus
94        del self.pipeline
95        del self.source
96        del self.sink
97        del self.handoffs
98        TestCase.tearDown(self)
99
100    def handoff_cb(self, element, buffer, pad):
101        self.handoffs.append(str(buffer))
102
103    def playAndIter(self):
104        self.handoffs = []
105        self.pipeline.set_state(gst.STATE_PLAYING)
106        assert self.pipeline.set_state(gst.STATE_PLAYING)
107        while 42:
108            msg = self.bus.pop()
109            if msg and msg.type == gst.MESSAGE_EOS:
110                break
111        assert self.pipeline.set_state(gst.STATE_PAUSED)
112        handoffs = self.handoffs
113        self.handoffs = []
114        return handoffs
115
116    def sink_seek(self, offset, method=gst.SEEK_TYPE_SET):
117        self.sink.seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
118                       method, offset,
119                       gst.SEEK_TYPE_NONE, 0)
120       
121    def testSimple(self):
122        handoffs = self.playAndIter()
123        assert handoffs == map(str, range(10))
124    
125    def testSeekCur(self):
126        self.sink_seek(8)
127        self.playAndIter()
128
129 class TestEmit(TestCase):
130     def testEmit(self):
131         object = testhelper.get_object()
132         object.connect('event', self._event_cb)
133         
134         # First emit from C
135         testhelper.emit_event(object)
136
137         # Then emit from Python
138         object.emit('event', gst.event_new_eos())
139
140     def _event_cb(self, obj, event):
141         assert isinstance(event, gst.Event)
142     
143
144 class TestDelayedEventProbe(TestCase):
145     # this test:
146     # starts a pipeline with only a source
147     # adds an event probe to catch the (first) new-segment
148     # adds a buffer probe to "autoplug" and send out this event
149     def setUp(self):
150         TestCase.setUp(self)
151         self.pipeline = gst.Pipeline()
152         self.src = gst.element_factory_make('fakesrc')
153         self.src.set_property('num-buffers', 10)
154         self.pipeline.add(self.src)
155         self.srcpad = self.src.get_pad('src')
156         
157     def tearDown(self):
158         gst.debug('setting pipeline to NULL')
159         self.pipeline.set_state(gst.STATE_NULL)
160         gst.debug('set pipeline to NULL')
161         # FIXME: wait for state change thread to die
162         while self.pipeline.__gstrefcount__ > 1:
163             gst.debug('waiting for self.pipeline G rc to drop to 1')
164             time.sleep(0.1)
165         self.assertEquals(self.pipeline.__gstrefcount__, 1)
166
167     def testProbe(self):
168         self.srcpad.add_event_probe(self._event_probe_cb)
169         self._buffer_probe_id = self.srcpad.add_buffer_probe(
170             self._buffer_probe_cb)
171
172         self._newsegment = None
173         self._eos = None
174         self._had_buffer = False
175
176         self.pipeline.set_state(gst.STATE_PLAYING)
177
178         while not self._eos:
179             time.sleep(0.1)
180
181         # verify if our newsegment event is still around and valid
182         self.failUnless(self._newsegment)
183         self.assertEquals(self._newsegment.type, gst.EVENT_NEWSEGMENT)
184         self.assertEquals(self._newsegment.__grefcount__, 1)
185
186         # verify if our eos event is still around and valid
187         self.failUnless(self._eos)
188         self.assertEquals(self._eos.type, gst.EVENT_EOS)
189         self.assertEquals(self._eos.__grefcount__, 1)
190  
191     def _event_probe_cb(self, pad, event):
192         if event.type == gst.EVENT_NEWSEGMENT:
193             self._newsegment = event
194             self.assertEquals(event.__grefcount__, 3)
195             # drop the event, we're storing it for later sending
196             return False
197
198         if  event.type == gst.EVENT_EOS:
199             self._eos = event
200             # we also want fakesink to get it
201             return True
202
203         # sinks now send Latency events upstream
204         if event.type == gst.EVENT_LATENCY:
205             return True
206
207         self.fail("Got an unknown event %r" % event)
208
209     def _buffer_probe_cb(self, pad, buffer):
210         self.failUnless(self._newsegment)
211
212         # fake autoplugging by now putting in a fakesink
213         sink = gst.element_factory_make('fakesink')
214         self.pipeline.add(sink)
215         self.src.link(sink)
216         sink.set_state(gst.STATE_PLAYING)
217
218         pad = sink.get_pad('sink')
219         pad.send_event(self._newsegment)
220
221         # we don't want to be called again
222         self.srcpad.remove_buffer_probe(self._buffer_probe_id)
223         
224         self._had_buffer = True
225         # now let the buffer through
226         return True
227
228 class TestEventCreationParsing(TestCase):
229
230     def testEventStep(self):
231         if hasattr(gst.Event, "parse_step"):
232             e = gst.event_new_step(gst.FORMAT_TIME, 42, 1.0, True, True)
233
234             self.assertEquals(e.type, gst.EVENT_STEP)
235
236             fmt, amount, rate, flush, intermediate = e.parse_step()
237             self.assertEquals(fmt, gst.FORMAT_TIME)
238             self.assertEquals(amount, 42)
239             self.assertEquals(rate, 1.0)
240             self.assertEquals(flush, True)
241             self.assertEquals(intermediate, True)
242
243 if __name__ == "__main__":
244     unittest.main()