Return more appropriate python exceptions instead of CreaterepoCError in some cases.
type = cr_detect_compression(filename, &tmp_err);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "%s", tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
return -1;
/* Check arguments */
+
if (mode != CR_CW_MODE_READ && mode != CR_CW_MODE_WRITE) {
PyErr_SetString(PyExc_ValueError, "Bad open mode");
return -1;
} else if (ContentStatObject_Check(py_stat)) {
stat = ContentStat_FromPyObject(py_stat);
} else {
- PyErr_SetString(PyExc_ValueError, "Use ContentStat or None");
+ PyErr_SetString(PyExc_TypeError, "Use ContentStat or None");
return -1;
}
/* Init */
self->f = cr_sopen(path, mode, comtype, stat, &err);
if (err) {
- PyErr_Format(CrErr_Exception, "CrFile initialization failed: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, "CrFile %s init failed: ", path);
return -1;
}
cr_write(self->f, str, len, &tmp_err);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "%s", tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
self->py_stat = NULL;
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "Close error: %s", tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, "Close error: ");
return NULL;
}
/* Init */
self->stat = cr_contentstat_new(type, &tmp_err);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "ContentStat initialization failed: %s",
- tmp_err->message);
- g_error_free(tmp_err);
+ nice_exception(&tmp_err, "ContentStat init failed: ");
return -1;
}
} else if (PyInt_Check(value)) {
val = (gint64) PyInt_AS_LONG(value);
} else {
- PyErr_SetString(PyExc_ValueError, "Number expected!");
+ PyErr_SetString(PyExc_TypeError, "Number expected!");
return -1;
}
cr_ContentStat *rec = self->stat;
} else if (PyInt_Check(value)) {
val = PyInt_AS_LONG(value);
} else {
- PyErr_SetString(PyExc_ValueError, "Number expected!");
+ PyErr_SetString(PyExc_TypeError, "Number expected!");
return -1;
}
cr_ContentStat *rec = self->stat;
if (check_ContentStatStatus(self))
return -1;
if (!PyString_Check(value) && value != Py_None) {
- PyErr_SetString(PyExc_ValueError, "String or None expected!");
+ PyErr_SetString(PyExc_TypeError, "String or None expected!");
return -1;
}
cr_ContentStat *rec = self->stat;
*/
#include <Python.h>
+#include <glib.h>
+#include <glib/gprintf.h>
#include "exception-py.h"
PyObject *CrErr_Exception = NULL;
return 1;
}
+
+void
+nice_exception(GError **err, const char *format, ...)
+{
+ int ret;
+ va_list vl;
+ gchar *message, *usr_message = NULL;
+ PyObject *exception;
+
+ if (format) {
+ // Prepare user message
+ va_start(vl, format);
+ ret = g_vasprintf(&usr_message, format, vl);
+ va_end(vl);
+
+ if (ret < 0) {
+ // vasprintf failed - silently ignore this error
+ g_free(usr_message);
+ usr_message = NULL;
+ }
+ }
+
+ // Prepare whole error message
+ if (usr_message)
+ message = g_strdup_printf("%s%s", usr_message, (*err)->message);
+ else
+ message = g_strdup((*err)->message);
+
+ g_free(usr_message);
+
+ // Select appropriate exception
+ switch ((*err)->code) {
+ case CRE_IO:
+ case CRE_STAT:
+ case CRE_NOFILE:
+ case CRE_NODIR:
+ case CRE_EXISTS:
+ exception = PyExc_IOError;
+ break;
+ case CRE_MEMORY:
+ exception = PyExc_MemoryError;
+ break;
+ case CRE_BADARG:
+ exception = PyExc_ValueError;
+ break;
+ default:
+ exception = CrErr_Exception;
+ }
+
+ g_clear_error(err);
+
+ // Set exception
+ PyErr_SetString(exception, message);
+
+ g_free(message);
+}
int init_exceptions();
+/* Set exception by its return code (e.g., for CRE_IO, CRE_NOFILE, etc. will
+ * be used a build-in python IOError exception) and free the GError.
+ * @param err GError **, must be != NULL
+ * @param format Prefix for the error message.
+ */
+void nice_exception(GError **err, const char *format, ...);
+
#endif
assert(self != NULL);
assert(MetadataObject_Check(self));
if (self->md == NULL) {
- PyErr_SetString(CrErr_Exception, "Improper createrepo_c Metadata object.");
+ PyErr_SetString(PyExc_TypeError, "Improper createrepo_c Metadata object.");
return -1;
}
return 0;
cr_metadata_load_xml(self->md, MetadataLocation_FromPyObject(ml), &tmp_err);
if (tmp_err) {
- PyErr_SetString(CrErr_Exception, tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
cr_metadata_locate_and_load_xml(self->md, path, &tmp_err);
if (tmp_err) {
- PyErr_SetString(CrErr_Exception, tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
Py_RETURN_NONE;
/* Init */
self->ml = cr_locate_metadata(repopath, ignore_db, &tmp_err);
if (tmp_err) {
- PyErr_SetString(CrErr_Exception, tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return -1;
}
return 0;
return NULL;
if (!PyString_Check(pykey)) {
- PyErr_SetString(PyExc_ValueError, "String expected!");
+ PyErr_SetString(PyExc_TypeError, "String expected!");
return NULL;
}
cr_compress_file_with_stat(src, dst, type, contentstat, &tmp_err);
if (tmp_err) {
- PyErr_SetString(CrErr_Exception, tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
PyObject *pypkg;
if (!pkg) {
- PyErr_SetString(PyExc_TypeError, "Expected a cr_Package pointer not NULL.");
+ PyErr_SetString(PyExc_ValueError, "Expected a cr_Package pointer not NULL.");
return NULL;
}
} else if (PyInt_Check(value)) {
val = (gint64) PyInt_AS_LONG(value);
} else {
- PyErr_SetString(PyExc_ValueError, "Number expected!");
+ PyErr_SetString(PyExc_TypeError, "Number expected!");
return -1;
}
cr_Package *pkg = self->package;
if (check_PackageStatus(self))
return -1;
if (!PyString_Check(value) && value != Py_None) {
- PyErr_SetString(PyExc_ValueError, "String or None expected!");
+ PyErr_SetString(PyExc_TypeError, "String or None expected!");
return -1;
}
cr_Package *pkg = self->package;
CheckPyDependency(PyObject *dep)
{
if (!PyTuple_Check(dep) || PyTuple_Size(dep) != 6) {
- PyErr_SetString(PyExc_ValueError, "Element of list has to be a tuple with 6 items.");
+ PyErr_SetString(PyExc_TypeError, "Element of list has to be a tuple with 6 items.");
return 1;
}
return 0;
CheckPyPackageFile(PyObject *dep)
{
if (!PyTuple_Check(dep) || PyTuple_Size(dep) != 3) {
- PyErr_SetString(PyExc_ValueError, "Element of list has to be a tuple with 3 items.");
+ PyErr_SetString(PyExc_TypeError, "Element of list has to be a tuple with 3 items.");
return 1;
}
return 0;
CheckPyChangelogEntry(PyObject *dep)
{
if (!PyTuple_Check(dep) || PyTuple_Size(dep) != 3) {
- PyErr_SetString(PyExc_ValueError, "Element of list has to be a tuple with 3 items.");
+ PyErr_SetString(PyExc_TypeError, "Element of list has to be a tuple with 3 items.");
return 1;
}
return 0;
return -1;
if (!PyList_Check(list)) {
- PyErr_SetString(PyExc_ValueError, "List expected!");
+ PyErr_SetString(PyExc_TypeError, "List expected!");
return -1;
}
return NULL;
}
- if (!g_file_test(filename, G_FILE_TEST_IS_REGULAR)) {
- PyErr_Format(PyExc_IOError, "File %s doesn't exist", filename);
- return NULL;
- }
-
pkg = cr_package_from_rpm(filename, checksum_type, location_href,
location_base, changelog_limit, NULL, &tmp_err);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "Cannot load %s: %s",
- filename, tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, "Cannot load %s: ", filename);
return NULL;
}
return NULL;
}
- if (!g_file_test(filename, G_FILE_TEST_IS_REGULAR)) {
- PyErr_Format(PyExc_IOError, "File %s doesn't exist", filename);
- return NULL;
- }
-
-
xml_res = cr_xml_from_rpm(filename, checksum_type, location_href,
location_base, changelog_limit, NULL, &tmp_err);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "Cannot load %s: %s",
- filename, tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, "Cannot load %s: ", filename);
return NULL;
}
GError *tmp_err = NULL;
char *xml = cr_xml_dump_repomd(self->repomd, &tmp_err);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "dump failed: %s", tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
py_str = PyStringOrNone_FromString(xml);
CheckPyString(PyObject *dep)
{
if (!PyString_Check(dep)) {
- PyErr_SetString(PyExc_ValueError, "Element of list has to be a string");
+ PyErr_SetString(PyExc_TypeError, "Element of list has to be a string");
return 1;
}
return 0;
CheckPyDistroTag(PyObject *dep)
{
if (!PyTuple_Check(dep) || PyTuple_Size(dep) != 2) {
- PyErr_SetString(PyExc_ValueError, "Element of list has to be a tuple with 2 items.");
+ PyErr_SetString(PyExc_TypeError, "Element of list has to be a tuple with 2 items.");
return 1;
}
return 0;
if (check_RepomdStatus(self))
return -1;
if (!PyString_Check(value) && value != Py_None) {
- PyErr_SetString(PyExc_ValueError, "String or None expected!");
+ PyErr_SetString(PyExc_TypeError, "String or None expected!");
return -1;
}
cr_Repomd *repomd = self->repomd;
return -1;
if (!PyList_Check(list)) {
- PyErr_SetString(PyExc_ValueError, "List expected!");
+ PyErr_SetString(PyExc_TypeError, "List expected!");
return -1;
}
PyObject *py_rec;
if (!rec) {
- PyErr_SetString(PyExc_TypeError, "Expected a cr_RepomdRecord pointer not NULL.");
+ PyErr_SetString(PyExc_ValueError, "Expected a cr_RepomdRecord pointer not NULL.");
return NULL;
}
cr_repomd_record_fill(self->record, checksum_type, &err);
if (err) {
- PyErr_Format(CrErr_Exception, "fill method failed: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
PyObject *compressed_repomdrecord;
GError *err = NULL;
- if (!PyArg_ParseTuple(args, "O!ii:fill",
+ if (!PyArg_ParseTuple(args, "O!ii:compress_and_fill",
&RepomdRecord_Type,
&compressed_repomdrecord,
&checksum_type,
compression_type,
&err);
if (err) {
- PyErr_Format(CrErr_Exception, "compress_and_fill method failed: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
cr_repomd_record_rename_file(self->record, &err);
if (err) {
- PyErr_Format(CrErr_Exception, "rename_file method failed: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
} else if (PyInt_Check(value)) {
val = (gint64) PyInt_AS_LONG(value);
} else {
- PyErr_SetString(PyExc_ValueError, "Number expected!");
+ PyErr_SetString(PyExc_TypeError, "Number expected!");
return -1;
}
cr_RepomdRecord *rec = self->record;
} else if (PyInt_Check(value)) {
val = PyInt_AS_LONG(value);
} else {
- PyErr_SetString(PyExc_ValueError, "Number expected!");
+ PyErr_SetString(PyExc_TypeError, "Number expected!");
return -1;
}
cr_RepomdRecord *rec = self->record;
if (check_RepomdRecordStatus(self))
return -1;
if (!PyString_Check(value) && value != Py_None) {
- PyErr_SetString(PyExc_ValueError, "String or None expected!");
+ PyErr_SetString(PyExc_TypeError, "String or None expected!");
return -1;
}
cr_RepomdRecord *rec = self->record;
/* Check arguments */
if (db_type < CR_DB_PRIMARY || db_type >= CR_DB_SENTINEL) {
- PyErr_SetString(PyExc_ValueError, "Unknown type value");
+ PyErr_SetString(PyExc_ValueError, "Unknown db type");
return -1;
}
/* Init */
self->db = cr_db_open(path, db_type, &err);
if (err) {
- PyErr_Format(CrErr_Exception, "Sqlite initialization failed: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return -1;
}
cr_db_add_pkg(self->db, Package_FromPyObject(py_pkg), &err);
if (err) {
- PyErr_Format(CrErr_Exception, "Cannot add package: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
cr_db_dbinfo_update(self->db, checksum, &err);
if (err) {
- PyErr_Format(CrErr_Exception, "Sqlite dbinfo_update error: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
cr_db_close(self->db, &err);
self->db = NULL;
if (err) {
- PyErr_Format(CrErr_Exception, "Sqlite close error: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
}
xml = cr_xml_dump_primary(Package_FromPyObject(py_pkg), &err);
if (err) {
- PyErr_Format(CrErr_Exception, "Error while dumping primary xml: %s",
- err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
xml = cr_xml_dump_filelists(Package_FromPyObject(py_pkg), &err);
if (err) {
- PyErr_Format(CrErr_Exception, "Error while dumping filelists xml: %s",
- err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
xml = cr_xml_dump_other(Package_FromPyObject(py_pkg), &err);
if (err) {
- PyErr_Format(CrErr_Exception, "Error while dumping other xml: %s",
- err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
xml_res = cr_xml_dump(Package_FromPyObject(py_pkg), &err);
if (err) {
- PyErr_Format(CrErr_Exception, "Error while dumping xml: %s",
- err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
} else if (ContentStatObject_Check(py_stat)) {
stat = ContentStat_FromPyObject(py_stat);
} else {
- PyErr_SetString(PyExc_ValueError, "Use ContentStat or None");
+ PyErr_SetString(PyExc_TypeError, "Use ContentStat or None");
return -1;
}
/* Init */
self->xmlfile = cr_xmlfile_sopen(path, type, comtype, stat, &err);
if (err) {
- PyErr_Format(CrErr_Exception, "XmlFile initialization failed: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return -1;
}
cr_xmlfile_set_num_of_pkgs(self->xmlfile, num, &err);
if (err) {
- PyErr_Format(CrErr_Exception, "XmlFile set_num_of_pkgs error: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
cr_xmlfile_add_pkg(self->xmlfile, Package_FromPyObject(py_pkg), &err);
if (err) {
- PyErr_Format(CrErr_Exception, "Cannot add package: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
cr_xmlfile_add_chunk(self->xmlfile, chunk, &err);
if (err) {
- PyErr_Format(CrErr_Exception, "Cannot add chunk: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
self->py_stat = NULL;
if (err) {
- PyErr_Format(CrErr_Exception, "Error while closing: %s", err->message);
- g_clear_error(&err);
+ nice_exception(&err, NULL);
return NULL;
}
}
if (py_newpkgcb == Py_None && py_pkgcb == Py_None) {
- PyErr_SetString(PyExc_TypeError, "both pkgcb and newpkgcb cannot be None");
+ PyErr_SetString(PyExc_ValueError, "both pkgcb and newpkgcb cannot be None");
return NULL;
}
Py_XDECREF(cbdata.py_pkg);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "%s", tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
}
if (py_newpkgcb == Py_None && py_pkgcb == Py_None) {
- PyErr_SetString(PyExc_TypeError, "both pkgcb and newpkgcb cannot be None");
+ PyErr_SetString(PyExc_ValueError, "both pkgcb and newpkgcb cannot be None");
return NULL;
}
Py_XDECREF(cbdata.py_pkg);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "%s", tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
}
if (py_newpkgcb == Py_None && py_pkgcb == Py_None) {
- PyErr_SetString(PyExc_TypeError, "both pkgcb and newpkgcb cannot be None");
+ PyErr_SetString(PyExc_ValueError, "both pkgcb and newpkgcb cannot be None");
return NULL;
}
Py_XDECREF(cbdata.py_pkg);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "%s", tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
cr_Repomd *repomd;
GError *tmp_err = NULL;
- if (!PyArg_ParseTuple(args, "sOO:py_xml_parse_repomd",
+ if (!PyArg_ParseTuple(args, "sO!O:py_xml_parse_repomd",
&filename,
+ &Repomd_Type,
&py_repomd,
&py_warningcb)) {
return NULL;
}
- if (!RepomdObject_Check(py_repomd)) {
- PyErr_SetString(PyExc_TypeError, "Bad type");
- return NULL;
- }
-
if (!PyCallable_Check(py_warningcb) && py_warningcb != Py_None) {
PyErr_SetString(PyExc_TypeError, "warningcb must be callable or None");
return NULL;
Py_XDECREF(py_warningcb);
if (tmp_err) {
- PyErr_Format(CrErr_Exception, "%s", tmp_err->message);
- g_clear_error(&tmp_err);
+ nice_exception(&tmp_err, NULL);
return NULL;
}
self.assertFalse(os.path.exists(path))
# Bad contentstat object
- self.assertRaises(ValueError, cr.XmlFile, path,
+ self.assertRaises(TypeError, cr.XmlFile, path,
cr.MODE_READ, cr.GZ_COMPRESSION, "foo")
self.assertFalse(os.path.exists(path))
# Non existing path
- self.assertRaises(cr.CreaterepoCError, cr.CrFile,
+ self.assertRaises(IOError, cr.CrFile,
"foobar/foo/xxx/cvydmaticxuiowe")
def test_crfile_no_compression(self):
def test_compress_file(self):
# Non exist file
- self.assertRaises(cr.CreaterepoCError, cr.compress_file,
+ self.assertRaises(IOError, cr.compress_file,
self.nofile, None, cr.BZ2)
# Compression - use the same name+suffix
self.assertRaises(IOError, cr.package_from_rpm, "./")
# File is not a rpm
- self.assertRaises(cr.CreaterepoCError, cr.package_from_rpm, FILE_BINARY_PATH)
+ self.assertRaises(IOError, cr.package_from_rpm, FILE_BINARY_PATH)
def test_xml_from_rpm(self):
xml = cr.xml_from_rpm(PKG_ARCHER_PATH)
self.assertRaises(IOError, cr.xml_from_rpm, "./")
# File is not a rpm
- self.assertRaises(cr.CreaterepoCError, cr.xml_from_rpm, FILE_BINARY_PATH)
+ self.assertRaises(IOError, cr.xml_from_rpm, FILE_BINARY_PATH)
self.assertFalse(os.path.exists(path))
# Bad contentstat object
- self.assertRaises(ValueError, cr.XmlFile, path,
+ self.assertRaises(TypeError, cr.XmlFile, path,
cr.XMLFILE_PRIMARY, cr.GZ_COMPRESSION, "foo")
self.assertFalse(os.path.exists(path))
# Already existing file
open(path, "w").write("foobar")
- self.assertRaises(cr.CreaterepoCError, cr.PrimaryXmlFile, path)
+ self.assertRaises(IOError, cr.PrimaryXmlFile, path)
def test_xmlfile_no_compression(self):
path = os.path.join(self.tmpdir, "primary.xml")
['fake_bash', 'super_kernel'])
def test_xml_parser_primary_repo02_no_cbs(self):
- self.assertRaises(TypeError,
+ self.assertRaises(ValueError,
cr.xml_parse_primary,
REPO_02_PRIXML, None, None, None, 1)
['fake_bash', 'super_kernel'])
def test_xml_parser_filelists_repo02_no_cbs(self):
- self.assertRaises(TypeError,
+ self.assertRaises(ValueError,
cr.xml_parse_filelists,
REPO_02_FILXML, None, None, None)
['fake_bash', 'super_kernel'])
def test_xml_parser_other_repo02_no_cbs(self):
- self.assertRaises(TypeError,
+ self.assertRaises(ValueError,
cr.xml_parse_other,
REPO_02_OTHXML, None, None, None)
class TestCaseXmlParserRepomd(unittest.TestCase):
+ def test_xml_parser_repomd_bad_repomd_object(self):
+ self.assertRaises(TypeError,
+ cr.xml_parse_repomd,
+ REPO_01_REPOMD, "foo", None)
+
def test_xml_parser_repomd_repo01(self):
warnings = []