+2005-10-14 Edward Hervey <edward@fluendo.com>
+
+ * gst/gst.defs:
+ Update for new API
+ * gst/gst.override:
+ Added unreffing for gst.Registry.get_plugin_list and .get_feature_list
+ * testsuite/common.py:
+ removed crude hack for gst.Registry and gst.Plugin
+
+ * testsuite/test_interface.py:
+ * testsuite/test_caps.py:
+ * testsuite/test_struct.py:
+ * testsuite/test_xml.py:
+ converted to using common's TestCase class
+
+ * testsuite/test_event.py:
+ Enabled/fixed previously non working test
+ * testsuite/test_registry.py:
+ Now uses common's TestCase class,
+ added special case setUp()
+
2005-10-13 Edward Hervey <edward@fluendo.com>
* gst/Makefile.am: (noinst_HEADERS):
)
)
+(define-method get_date
+ (of-object "GstTagList")
+ (c-name "gst_tag_list_get_date")
+ (return-type "gboolean")
+ (parameters
+ '("const-gchar*" "tag")
+ '("GDate**" "value")
+ )
+)
+
+(define-method get_date_index
+ (of-object "GstTagList")
+ (c-name "gst_tag_list_get_date_index")
+ (return-type "gboolean")
+ (parameters
+ '("const-gchar*" "tag")
+ '("guint" "index")
+ '("GDate**" "value")
+ )
+)
+
;; From ../gstreamer/gst/gsttaginterface.h
static PyObject *
_wrap_gst_registry_get_path_list (PyGObject *self)
{
- GstRegistry *registry;
- GList *l, *paths;
- PyObject *list;
+ GstRegistry *registry;
+ GList *l, *paths;
+ PyObject *list;
gint i;
-
- registry = GST_REGISTRY (self->obj);
-
- paths = gst_registry_get_path_list (registry);
-
- list = PyList_New (g_list_length(paths));
- for (l = paths, i = 0; l; l = l->next, ++i) {
- gchar *path = (gchar *) l->data;
- PyList_SetItem (list, i, PyString_FromString(path));
- }
- g_list_free (paths);
-
- return list;
+
+ registry = GST_REGISTRY (self->obj);
+
+ paths = gst_registry_get_path_list (registry);
+
+ list = PyList_New (g_list_length(paths));
+ for (l = paths, i = 0; l; l = l->next, ++i) {
+ gchar *path = (gchar *) l->data;
+ PyList_SetItem (list, i, PyString_FromString(path));
+ }
+ g_list_free (paths);
+
+ return list;
}
%%
list = PyList_New (g_list_length(plugins));
for (l = plugins, i = 0; l; l = l->next, ++i) {
GstPlugin *plugin = (GstPlugin *) l->data;
- PyList_SetItem (list, i, pygobject_new (G_OBJECT (plugin)));
+ PyObject *object = pygstobject_new (G_OBJECT (plugin));
+ gst_object_unref (plugin);
+
+ PyList_SetItem (list, i, object);
}
g_list_free (plugins);
-
+
return list;
}
for (l = features, i = 0; l; l = l->next, ++i) {
GstPluginFeature *feature = (GstPluginFeature *) l->data;
PyList_SetItem (list, i, pygobject_new (G_OBJECT (feature)));
+ gst_object_unref (feature);
}
g_list_free (features);
class TestCase(unittest.TestCase):
_types = [gst.Object, gst.MiniObject]
- _except_types = [gst.Registry, gst.Plugin, gst.PluginFeature]
def gccollect(self):
# run the garbage collector
objs = [o for o in gc.get_objects() if isinstance(o, c)]
new.extend([o for o in objs if o not in self._tracked[c]])
- for u in self._except_types:
- new = [o for o in new if not isinstance(o, u)]
self.failIf(new, new)
#self.failIf(new, ["%r:%d" % (type(o), id(o)) for o in new])
del self._tracked
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import sys
-from common import gst, unittest
+from common import gst, unittest, TestCase
-class CapsTest(unittest.TestCase):
+class CapsTest(TestCase):
def setUp(self):
+ TestCase.setUp(self)
self.caps = gst.caps_from_string('video/x-raw-yuv,width=10,framerate=5.0;video/x-raw-rgb,width=15,framerate=10.0')
self.assertEquals(self.caps.__refcount__, 1)
self.structure = self.caps[0]
self.assertRaises(TypeError, self.sink.send_event, number)
-# FIXME: fix these tests
-#class EventFileSrcTest(unittest.TestCase):
-# # FIXME: properly create temp files
-# filename = '/tmp/gst-python-test-file'
-# def setUp(self):
-# if os.path.exists(self.filename):
-# os.remove(self.filename)
-# open(self.filename, 'w').write(''.join(map(str, range(10))))
-#
-# self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
-# self.source = self.pipeline.get_by_name('source')
-# self.sink = self.pipeline.get_by_name('sink')
-# self.sink.connect('handoff', self.handoff_cb)
-# self.bus = self.pipeline.get_bus()
-# self.pipeline.set_state(gst.STATE_PLAYING)
-#
-# def tearDown(self):
-# assert self.pipeline.set_state(gst.STATE_PLAYING)
-# if os.path.exists(self.filename):
-# os.remove(self.filename)
-#
-# def handoff_cb(self, element, buffer, pad):
-# self.handoffs.append(str(buffer))
-#
-# def playAndIter(self):
-# self.handoffs = []
-# assert self.pipeline.set_state(gst.STATE_PLAYING)
-# while 42:
-# msg = self.bus.pop()
-# if msg and msg.type == gst.MESSAGE_EOS:
-# break
-# assert self.pipeline.set_state(gst.STATE_PAUSED)
-# handoffs = self.handoffs
-# self.handoffs = []
-# return handoffs
-#
-# def sink_seek(self, offset, method=gst.SEEK_METHOD_SET):
-# method |= (gst.SEEK_FLAG_FLUSH | gst.FORMAT_BYTES)
-# self.source.send_event(gst.event_new_seek(method, offset))
-# self.source.send_event(gst.Event(gst.EVENT_FLUSH))
-# self.sink.send_event(gst.event_new_seek(method, offset))
-# self.sink.send_event(gst.Event(gst.EVENT_FLUSH))
-#
-# def testSimple(self):
-# handoffs = self.playAndIter()
-# assert handoffs == map(str, range(10))
-#
-# def testSeekCur(self):
-# self.sink_seek(8)
-#
-# #print self.playAndIter()
+class EventFileSrcTest(TestCase):
+ # FIXME: properly create temp files
+ filename = '/tmp/gst-python-test-file'
+ def setUp(self):
+ TestCase.setUp(self)
+ gst.info("start")
+ if os.path.exists(self.filename):
+ os.remove(self.filename)
+ open(self.filename, 'w').write(''.join(map(str, range(10))))
+
+ self.pipeline = gst.parse_launch('filesrc name=source location=%s blocksize=1 ! fakesink signal-handoffs=1 name=sink' % self.filename)
+ self.source = self.pipeline.get_by_name('source')
+ self.sink = self.pipeline.get_by_name('sink')
+ self.sigid = self.sink.connect('handoff', self.handoff_cb)
+ self.bus = self.pipeline.get_bus()
+
+ def tearDown(self):
+ self.pipeline.set_state(gst.STATE_NULL)
+ self.sink.disconnect(self.sigid)
+ if os.path.exists(self.filename):
+ os.remove(self.filename)
+ del self.bus
+ del self.pipeline
+ del self.source
+ del self.sink
+ del self.handoffs
+ TestCase.tearDown(self)
+
+ def handoff_cb(self, element, buffer, pad):
+ self.handoffs.append(str(buffer))
+
+ def playAndIter(self):
+ self.handoffs = []
+ self.pipeline.set_state(gst.STATE_PLAYING)
+ assert self.pipeline.set_state(gst.STATE_PLAYING)
+ while 42:
+ msg = self.bus.pop()
+ if msg and msg.type == gst.MESSAGE_EOS:
+ break
+ assert self.pipeline.set_state(gst.STATE_PAUSED)
+ handoffs = self.handoffs
+ self.handoffs = []
+ return handoffs
+
+ def sink_seek(self, offset, method=gst.SEEK_TYPE_SET):
+ self.sink.seek(1.0, gst.FORMAT_BYTES, gst.SEEK_FLAG_FLUSH,
+ method, offset,
+ gst.SEEK_TYPE_NONE, 0)
+
+ def testSimple(self):
+ handoffs = self.playAndIter()
+ assert handoffs == map(str, range(10))
+
+ def testSeekCur(self):
+ self.sink_seek(8)
+ self.playAndIter()
class TestEmit(TestCase):
def testEmit(self):
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-from common import gst, unittest
+from common import gst, unittest, TestCase
import gobject
-class Availability(unittest.TestCase):
+class Availability(TestCase):
def testXOverlay(self):
assert hasattr(gst.interfaces, 'XOverlay')
assert issubclass(gst.interfaces.XOverlay, gobject.GInterface)
assert hasattr(gst.interfaces, 'Mixer')
assert issubclass(gst.interfaces.Mixer, gobject.GInterface)
-class FunctionCall(unittest.TestCase):
+class FunctionCall(TestCase):
def FIXME_testXOverlay(self):
# obviously a testsuite is not allowed to instantiate this
# since it needs a running X or will fail. find some way to
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import sys
-from common import gst, unittest
+import gc
+from common import gst, unittest, TestCase
+
+class RegistryTest(TestCase):
+ def setUp(self):
+ self.registry = gst.registry_get_default()
+ self.plugins = self.registry.get_plugin_list()
+ TestCase.setUp(self)
-class RegistryTest(unittest.TestCase):
def testGetDefault(self):
- registry = gst.registry_get_default()
+ assert(self.registry)
def testPluginList(self):
- registry = gst.registry_get_default()
- plugins = registry.get_plugin_list()
- names = map(lambda p: p.get_name(), plugins)
+ names = map(lambda p: p.get_name(), self.plugins)
self.failUnless('gstcoreelements' in names)
+ def testGetPathList(self):
+ # FIXME: this returns an empty list; probably due to core;
+ # examine problem
+
+ paths = self.registry.get_path_list()
+
+class RegistryFeatureTest(TestCase):
+ def setUp(self):
+ self.registry = gst.registry_get_default()
+ self.plugins = self.registry.get_plugin_list()
+ self.efeatures = self.registry.get_feature_list(gst.TYPE_ELEMENT_FACTORY)
+ self.tfeatures = self.registry.get_feature_list(gst.TYPE_TYPE_FIND_FACTORY)
+ self.ifeatures = self.registry.get_feature_list(gst.TYPE_INDEX_FACTORY)
+ TestCase.setUp(self)
+
def testFeatureList(self):
- registry = gst.registry_get_default()
- self.assertRaises(TypeError, registry.get_feature_list, "kaka")
+ self.assertRaises(TypeError, self.registry.get_feature_list, "kaka")
- features = registry.get_feature_list(gst.TYPE_ELEMENT_FACTORY)
- elements = map(lambda f: f.get_name(), features)
+ elements = map(lambda f: f.get_name(), self.efeatures)
self.failUnless('fakesink' in elements)
- features = registry.get_feature_list(gst.TYPE_TYPE_FIND_FACTORY)
- typefinds = map(lambda f: f.get_name(), features)
+ typefinds = map(lambda f: f.get_name(), self.tfeatures)
- features = registry.get_feature_list(gst.TYPE_INDEX_FACTORY)
- indexers = map(lambda f: f.get_name(), features)
+ indexers = map(lambda f: f.get_name(), self.ifeatures)
self.failUnless('memindex' in indexers)
-
- def testGetPathList(self):
- # FIXME: this returns an empty list; probably due to core;
- # examine problem
- registry = gst.registry_get_default()
- paths = registry.get_path_list()
-
+
if __name__ == "__main__":
unittest.main()
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import sys
-from common import gst, unittest
+from common import gst, unittest, TestCase
-class StructureTest(unittest.TestCase):
+class StructureTest(TestCase):
def setUp(self):
+ TestCase.setUp(self)
self.struct = gst.structure_from_string('video/x-raw-yuv,width=10,foo="bar",pixel-aspect-ratio=1/2,framerate=5.0,boolean=(boolean)true')
def testName(self):
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
-from common import gst, unittest
+from common import gst, unittest, TestCase
-class PadTest(unittest.TestCase):
+class PadTest(TestCase):
def testQuery(self):
xml = gst.XML()