--- /dev/null
+/* createrepo_c - Library of routines for manipulation with repodata
+ * Copyright (C) 2013 Tomas Mlcoch
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
+ * USA.
+ */
+
+#include <Python.h>
+#include <assert.h>
+#include <stddef.h>
+
+#include "xml_file-py.h"
+#include "package-py.h"
+#include "exception-py.h"
+#include "typeconversion.h"
+
+typedef struct {
+ PyObject_HEAD
+ cr_XmlFile *xmlfile;
+} _XmlFileObject;
+
+static PyObject * xmlfile_close(_XmlFileObject *self, void *nothing);
+
+static int
+check_XmlFileStatus(const _XmlFileObject *self)
+{
+ assert(self != NULL);
+ assert(XmlFileObject_Check(self));
+ if (self->xmlfile == NULL) {
+ PyErr_SetString(CrErr_Exception,
+ "Improper createrepo_c XmlFile object (Already closed file?).");
+ return -1;
+ }
+ return 0;
+}
+
+/* Function on the type */
+
+static PyObject *
+xmlfile_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
+{
+ CR_UNUSED(args);
+ CR_UNUSED(kwds);
+ _XmlFileObject *self = (_XmlFileObject *)type->tp_alloc(type, 0);
+ if (self)
+ self->xmlfile = NULL;
+ return (PyObject *)self;
+}
+
+static int
+xmlfile_init(_XmlFileObject *self, PyObject *args, PyObject *kwds)
+{
+ char *path;
+ int type, comtype;
+ GError *err = NULL;
+ PyObject *ret;
+
+ CR_UNUSED(kwds);
+
+ if (!PyArg_ParseTuple(args, "sii|:xmlfile_init", &path, &type, &comtype))
+ return -1;
+
+ /* Check arguments */
+ if (type < 0 || type >= CR_XMLFILE_SENTINEL) {
+ PyErr_SetString(PyExc_ValueError, "Unknown XML file type");
+ return -1;
+ }
+
+ if (comtype < 0 || comtype >= CR_CW_COMPRESSION_SENTINEL) {
+ PyErr_SetString(PyExc_ValueError, "Unknown compression type");
+ return -1;
+ }
+
+ /* Free all previous resources when reinitialization */
+ ret = xmlfile_close(self, NULL);
+ Py_XDECREF(ret);
+ if (ret == NULL) {
+ // Error encountered!
+ return -1;
+ }
+
+ /* Init */
+ self->xmlfile = cr_xmlfile_open(path, type, comtype, &err);
+ if (err) {
+ PyErr_Format(CrErr_Exception, "XmlFile initialization failed: %s", err->message);
+ g_clear_error(&err);
+ return -1;
+ }
+
+ return 0;
+}
+
+static void
+xmlfile_dealloc(_XmlFileObject *self)
+{
+ cr_xmlfile_close(self->xmlfile, NULL);
+ Py_TYPE(self)->tp_free(self);
+}
+
+static PyObject *
+xmlfile_repr(_XmlFileObject *self)
+{
+ char *type;
+
+ switch (self->xmlfile->type) {
+ case CR_XMLFILE_PRIMARY:
+ type = "Primary";
+ break;
+ case CR_XMLFILE_FILELISTS:
+ type = "Filelists";
+ break;
+ case CR_XMLFILE_OTHER:
+ type = "Other";
+ break;
+ default:
+ type = "Unknown";
+ }
+
+ return PyString_FromFormat("<createrepo_c.XmlFile %s object>", type);
+}
+
+/* XmlFile methods */
+
+static PyObject *
+set_num_of_pkgs(_XmlFileObject *self, PyObject *args)
+{
+ long num;
+ GError *err = NULL;
+
+ if (!PyArg_ParseTuple(args, "l:set_num_of_pkgs", &num))
+ return NULL;
+
+ if (check_XmlFileStatus(self))
+ return NULL;
+
+ cr_xmlfile_set_num_of_pkgs(self->xmlfile, num, &err);
+ if (err) {
+ PyErr_Format(CrErr_Exception, "XmlFile set_num_of_pkgs error: %s", err->message);
+ g_clear_error(&err);
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+add_pkg(_XmlFileObject *self, PyObject *args)
+{
+ PyObject *py_pkg;
+ GError *err = NULL;
+
+ if (!PyArg_ParseTuple(args, "O!:add_pkg", &Package_Type, &py_pkg))
+ return NULL;
+
+ if (check_XmlFileStatus(self))
+ return NULL;
+
+ cr_xmlfile_add_pkg(self->xmlfile, Package_FromPyObject(py_pkg), &err);
+ if (err) {
+ PyErr_Format(CrErr_Exception, "Cannot add package: %s", err->message);
+ g_clear_error(&err);
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+add_chunk(_XmlFileObject *self, PyObject *args)
+{
+ char *chunk;
+ GError *err = NULL;
+
+ if (!PyArg_ParseTuple(args, "s:add_chunk", &chunk))
+ return NULL;
+
+ if (check_XmlFileStatus(self))
+ return NULL;
+
+ cr_xmlfile_add_chunk(self->xmlfile, chunk, &err);
+ if (err) {
+ PyErr_Format(CrErr_Exception, "Cannot add chunk: %s", err->message);
+ g_clear_error(&err);
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}
+
+static PyObject *
+xmlfile_close(_XmlFileObject *self, void *nothing)
+{
+ GError *err = NULL;
+
+ CR_UNUSED(nothing);
+
+ if (self->xmlfile) {
+ cr_xmlfile_close(self->xmlfile, &err);
+ self->xmlfile = NULL;
+ if (err) {
+ PyErr_Format(CrErr_Exception, "Error while closing: %s", err->message);
+ g_clear_error(&err);
+ return NULL;
+ }
+ }
+ Py_RETURN_NONE;
+}
+
+static struct PyMethodDef xmlfile_methods[] = {
+ {"set_num_of_pkgs", (PyCFunction)set_num_of_pkgs, METH_VARARGS, NULL},
+ {"add_pkg", (PyCFunction)add_pkg, METH_VARARGS, NULL},
+ {"add_chunk", (PyCFunction)add_chunk, METH_VARARGS, NULL},
+ {"close", (PyCFunction)xmlfile_close, METH_NOARGS, NULL},
+ {NULL} /* sentinel */
+};
+
+PyTypeObject XmlFile_Type = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ "_librepo.XmlFile", /* tp_name */
+ sizeof(_XmlFileObject), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor) xmlfile_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ (reprfunc) xmlfile_repr, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT|Py_TPFLAGS_BASETYPE, /* tp_flags */
+ "XmlFile object", /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ PyObject_SelfIter, /* tp_iter */
+ 0, /* tp_iternext */
+ xmlfile_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ (initproc) xmlfile_init, /* tp_init */
+ 0, /* tp_alloc */
+ xmlfile_new, /* tp_new */
+ 0, /* tp_free */
+ 0, /* tp_is_gc */
+};
--- /dev/null
+import unittest
+import shutil
+import tempfile
+import os.path
+import createrepo_c as cr
+
+from fixtures import *
+
+class TestCaseXmlFile(unittest.TestCase):
+
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp(prefix="createrepo_ctest-")
+
+ def tearDown(self):
+ shutil.rmtree(self.tmpdir)
+
+ def test_xmlfile_basic_operations(self):
+ pri = cr.XmlFile(self.tmpdir+"/primary.xml.gz",
+ cr.XMLFILE_PRIMARY,
+ cr.GZ_COMPRESSION)
+ self.assertTrue(pri)
+ self.assertTrue(os.path.isfile(self.tmpdir+"/primary.xml.gz"))
+
+ fil = cr.XmlFile(self.tmpdir+"/filelists.xml.gz",
+ cr.XMLFILE_FILELISTS,
+ cr.GZ_COMPRESSION)
+ self.assertTrue(fil)
+ self.assertTrue(os.path.isfile(self.tmpdir+"/filelists.xml.gz"))
+
+ oth = cr.XmlFile(self.tmpdir+"/other.xml.gz",
+ cr.XMLFILE_OTHER,
+ cr.GZ_COMPRESSION)
+ self.assertTrue(oth)
+ self.assertTrue(os.path.isfile(self.tmpdir+"/other.xml.gz"))
+
+ def test_xmlfile_operations_on_closed_file(self):
+ # Already closed file
+ path = os.path.join(self.tmpdir, "primary.xml.gz")
+ pkg = cr.package_from_rpm(PKG_ARCHER_PATH)
+ self.assertTrue(pkg)
+
+ f = cr.PrimaryXmlFile(path, cr.GZ_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.close()
+ self.assertRaises(cr.CreaterepoCError, f.set_num_of_pkgs, 1)
+ self.assertRaises(cr.CreaterepoCError, f.add_pkg, pkg)
+ self.assertRaises(cr.CreaterepoCError, f.add_chunk, "<chunk>text</chunk>")
+ f.close() # No error should be raised
+ del(f) # No error should be raised
+
+ def test_xmlfile_error_cases(self):
+ path = os.path.join(self.tmpdir, "foofile")
+ self.assertFalse(os.path.exists(path))
+
+ # Bad file type
+ self.assertRaises(ValueError, cr.XmlFile, path, 86, cr.GZ_COMPRESSION)
+ self.assertFalse(os.path.exists(path))
+
+ # Bad compression type
+ self.assertRaises(ValueError, cr.XmlFile, path, cr.XMLFILE_PRIMARY, 678)
+ self.assertFalse(os.path.exists(path))
+
+ # Non existing path
+ self.assertRaises(cr.CreaterepoCError, cr.PrimaryXmlFile,
+ "foobar/foo/xxx/cvydmaticxuiowe")
+
+ # Already existing file
+ open(path, "w").write("foobar")
+ self.assertRaises(cr.CreaterepoCError, cr.PrimaryXmlFile, path)
+
+ def test_xmlfile_no_compression(self):
+ path = os.path.join(self.tmpdir, "primary.xml")
+ f = cr.PrimaryXmlFile(path, cr.NO_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.close()
+
+ content = open(path).read()
+ self.assertEqual(content,
+"""<?xml version="1.0" encoding="UTF-8"?>
+<metadata xmlns="http://linux.duke.edu/metadata/common" xmlns:rpm="http://linux.duke.edu/metadata/rpm" packages="0">
+</metadata>""")
+
+ def test_xmlfile_gz_compression(self):
+ path = os.path.join(self.tmpdir, "primary.xml.gz")
+ f = cr.PrimaryXmlFile(path, cr.GZ_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.close()
+
+ import gzip
+ content = gzip.open(path).read()
+ self.assertEqual(content,
+"""<?xml version="1.0" encoding="UTF-8"?>
+<metadata xmlns="http://linux.duke.edu/metadata/common" xmlns:rpm="http://linux.duke.edu/metadata/rpm" packages="0">
+</metadata>""")
+
+ def test_xmlfile_bz2_compression(self):
+ path = os.path.join(self.tmpdir, "primary.xml.bz2")
+ f = cr.PrimaryXmlFile(path, cr.BZ2_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.close()
+
+ import bz2
+ content = bz2.decompress(open(path).read())
+ self.assertEqual(content,
+"""<?xml version="1.0" encoding="UTF-8"?>
+<metadata xmlns="http://linux.duke.edu/metadata/common" xmlns:rpm="http://linux.duke.edu/metadata/rpm" packages="0">
+</metadata>""")
+
+ def test_xmlfile_xz_compression(self):
+ path = os.path.join(self.tmpdir, "primary.xml.xz")
+ f = cr.PrimaryXmlFile(path, cr.XZ_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.close()
+
+ import subprocess
+ p = subprocess.Popen(["unxz", "--stdout", path], stdout=subprocess.PIPE)
+ content = p.stdout.read()
+ self.assertEqual(content,
+"""<?xml version="1.0" encoding="UTF-8"?>
+<metadata xmlns="http://linux.duke.edu/metadata/common" xmlns:rpm="http://linux.duke.edu/metadata/rpm" packages="0">
+</metadata>""")
+
+ def test_xmlfile_add_pkg(self):
+ pkg = cr.package_from_rpm(PKG_ARCHER_PATH)
+ self.assertTrue(pkg)
+
+ # Primary
+ path = os.path.join(self.tmpdir, "primary.xml")
+ f = cr.PrimaryXmlFile(path, cr.NO_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.add_pkg(pkg)
+ self.assertRaises(TypeError, f.add_pkg, None)
+ self.assertRaises(TypeError, f.add_pkg, 123)
+ self.assertRaises(TypeError, f.add_pkg, "foo")
+ self.assertRaises(TypeError, f.add_pkg, [456])
+ f.close()
+
+ self.assertTrue(os.path.isfile(path))
+ self.assertEqual(open(path).read(),
+"""<?xml version="1.0" encoding="UTF-8"?>
+<metadata xmlns="http://linux.duke.edu/metadata/common" xmlns:rpm="http://linux.duke.edu/metadata/rpm" packages="0">
+<package type="rpm">
+ <name>Archer</name>
+ <arch>x86_64</arch>
+ <version epoch="2" ver="3.4.5" rel="6"/>
+ <checksum type="sha256" pkgid="YES">4e0b775220c67f0f2c1fd2177e626b9c863a098130224ff09778ede25cea9a9e</checksum>
+ <summary>Complex package.</summary>
+ <description>Archer package</description>
+ <packager>Sterling Archer</packager>
+ <url>http://soo_complex_package.eu/</url>
+ <time file="1368446051" build="1365416480"/>
+ <size package="3101" installed="0" archive="544"/>
+ <location href=""/>
+ <format>
+ <rpm:license>GPL</rpm:license>
+ <rpm:vendor>ISIS</rpm:vendor>
+ <rpm:group>Development/Tools</rpm:group>
+ <rpm:buildhost>localhost.localdomain</rpm:buildhost>
+ <rpm:sourcerpm>Archer-3.4.5-6.src.rpm</rpm:sourcerpm>
+ <rpm:header-range start="280" end="2865"/>
+ <rpm:provides>
+ <rpm:entry name="bara" flags="LE" epoch="0" ver="22"/>
+ <rpm:entry name="barb" flags="GE" epoch="0" ver="11.22.33" rel="44"/>
+ <rpm:entry name="barc" flags="EQ" epoch="0" ver="33"/>
+ <rpm:entry name="bard" flags="LT" epoch="0" ver="44"/>
+ <rpm:entry name="bare" flags="GT" epoch="0" ver="55"/>
+ <rpm:entry name="Archer" flags="EQ" epoch="2" ver="3.4.5" rel="6"/>
+ <rpm:entry name="Archer(x86-64)" flags="EQ" epoch="2" ver="3.4.5" rel="6"/>
+ </rpm:provides>
+ <rpm:requires>
+ <rpm:entry name="fooa" flags="LE" epoch="0" ver="2"/>
+ <rpm:entry name="foob" flags="GE" epoch="0" ver="1.0.0" rel="1"/>
+ <rpm:entry name="fooc" flags="EQ" epoch="0" ver="3"/>
+ <rpm:entry name="food" flags="LT" epoch="0" ver="4"/>
+ <rpm:entry name="fooe" flags="GT" epoch="0" ver="5"/>
+ <rpm:entry name="foof" flags="EQ" epoch="0" ver="6" pre="1"/>
+ </rpm:requires>
+ <rpm:conflicts>
+ <rpm:entry name="bba" flags="LE" epoch="0" ver="2222"/>
+ <rpm:entry name="bbb" flags="GE" epoch="0" ver="1111.2222.3333" rel="4444"/>
+ <rpm:entry name="bbc" flags="EQ" epoch="0" ver="3333"/>
+ <rpm:entry name="bbd" flags="LT" epoch="0" ver="4444"/>
+ <rpm:entry name="bbe" flags="GT" epoch="0" ver="5555"/>
+ </rpm:conflicts>
+ <rpm:obsoletes>
+ <rpm:entry name="aaa" flags="LE" epoch="0" ver="222"/>
+ <rpm:entry name="aab" flags="GE" epoch="0" ver="111.2.3" rel="4"/>
+ <rpm:entry name="aac" flags="EQ" epoch="0" ver="333"/>
+ <rpm:entry name="aad" flags="LT" epoch="0" ver="444"/>
+ <rpm:entry name="aae" flags="GT" epoch="0" ver="555"/>
+ </rpm:obsoletes>
+ <file>/usr/bin/complex_a</file>
+ </format>
+</package>
+</metadata>""")
+
+ # Filelists
+ path = os.path.join(self.tmpdir, "filelists.xml")
+ f = cr.FilelistsXmlFile(path, cr.NO_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.add_pkg(pkg)
+ self.assertRaises(TypeError, f.add_pkg, None)
+ self.assertRaises(TypeError, f.add_pkg, 123)
+ self.assertRaises(TypeError, f.add_pkg, "foo")
+ self.assertRaises(TypeError, f.add_pkg, [456])
+ f.close()
+
+ self.assertTrue(os.path.isfile(path))
+ self.assertEqual(open(path).read(),
+"""<?xml version="1.0" encoding="UTF-8"?>
+<filelists xmlns="http://linux.duke.edu/metadata/filelists" packages="0">
+<package pkgid="4e0b775220c67f0f2c1fd2177e626b9c863a098130224ff09778ede25cea9a9e" name="Archer" arch="x86_64">
+ <version epoch="2" ver="3.4.5" rel="6"/>
+ <file>/usr/bin/complex_a</file>
+ <file type="dir">/usr/share/doc/Archer-3.4.5</file>
+ <file>/usr/share/doc/Archer-3.4.5/README</file>
+</package>
+</filelists>""")
+
+ # Other
+ path = os.path.join(self.tmpdir, "other.xml")
+ f = cr.OtherXmlFile(path, cr.NO_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.add_pkg(pkg)
+ self.assertRaises(TypeError, f.add_pkg, None)
+ self.assertRaises(TypeError, f.add_pkg, 123)
+ self.assertRaises(TypeError, f.add_pkg, "foo")
+ self.assertRaises(TypeError, f.add_pkg, [456])
+ f.close()
+
+ self.assertTrue(os.path.isfile(path))
+ self.assertEqual(open(path).read(),
+"""<?xml version="1.0" encoding="UTF-8"?>
+<otherdata xmlns="http://linux.duke.edu/metadata/other" packages="0">
+<package pkgid="4e0b775220c67f0f2c1fd2177e626b9c863a098130224ff09778ede25cea9a9e" name="Archer" arch="x86_64">
+ <version epoch="2" ver="3.4.5" rel="6"/>
+ <changelog author="Tomas Mlcoch <tmlcoch@redhat.com> - 1.1.1-1" date="1334664000">- First changelog.</changelog>
+ <changelog author="Tomas Mlcoch <tmlcoch@redhat.com> - 2.2.2-2" date="1334750400">- That was totally ninja!</changelog>
+ <changelog author="Tomas Mlcoch <tmlcoch@redhat.com> - 3.3.3-3" date="1365422400">- 3. changelog.</changelog>
+</package>
+</otherdata>""")
+
+ def test_xmlfile_add_chunk(self):
+ chunk = " <chunk>Some XML chunk</chunk>\n"
+
+ # Primary
+ path = os.path.join(self.tmpdir, "primary.xml")
+ f = cr.PrimaryXmlFile(path, cr.NO_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.add_chunk(chunk)
+ self.assertRaises(TypeError, f.add_chunk, None)
+ self.assertRaises(TypeError, f.add_chunk, 123)
+ self.assertRaises(TypeError, f.add_chunk, [1])
+ self.assertRaises(TypeError, f.add_chunk, ["foo"])
+ f.close()
+
+ self.assertTrue(os.path.isfile(path))
+ self.assertEqual(open(path).read(),
+"""<?xml version="1.0" encoding="UTF-8"?>
+<metadata xmlns="http://linux.duke.edu/metadata/common" xmlns:rpm="http://linux.duke.edu/metadata/rpm" packages="0">
+ <chunk>Some XML chunk</chunk>
+</metadata>""")
+
+ # Filelists
+ path = os.path.join(self.tmpdir, "filelists.xml")
+ f = cr.FilelistsXmlFile(path, cr.NO_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.add_chunk(chunk)
+ self.assertRaises(TypeError, f.add_chunk, None)
+ self.assertRaises(TypeError, f.add_chunk, 123)
+ self.assertRaises(TypeError, f.add_chunk, [1])
+ self.assertRaises(TypeError, f.add_chunk, ["foo"])
+ f.close()
+
+ self.assertTrue(os.path.isfile(path))
+ self.assertEqual(open(path).read(),
+"""<?xml version="1.0" encoding="UTF-8"?>
+<filelists xmlns="http://linux.duke.edu/metadata/filelists" packages="0">
+ <chunk>Some XML chunk</chunk>
+</filelists>""")
+
+ # Other
+ path = os.path.join(self.tmpdir, "other.xml")
+ f = cr.OtherXmlFile(path, cr.NO_COMPRESSION)
+ self.assertTrue(f)
+ self.assertTrue(os.path.isfile(path))
+ f.add_chunk(chunk)
+ self.assertRaises(TypeError, f.add_chunk, None)
+ self.assertRaises(TypeError, f.add_chunk, 123)
+ self.assertRaises(TypeError, f.add_chunk, [1])
+ self.assertRaises(TypeError, f.add_chunk, ["foo"])
+ f.close()
+
+ self.assertTrue(os.path.isfile(path))
+ self.assertEqual(open(path).read(),
+"""<?xml version="1.0" encoding="UTF-8"?>
+<otherdata xmlns="http://linux.duke.edu/metadata/other" packages="0">
+ <chunk>Some XML chunk</chunk>
+</otherdata>""")