RuntimeWarning,
stacklevel=2)
+
ExceptHookManager = ExceptHookManagerClass()
gettext.textdomain(gettext_domain)
gettext.bind_textdomain_codeset(gettext_domain, "UTF-8")
+
def _init_logging(level):
if level == "none":
return
- mapping = { "debug": logging.DEBUG,
- "info": logging.INFO,
- "warning": logging.WARNING,
- "error": logging.ERROR,
- "critical": logging.CRITICAL }
+ mapping = {"debug": logging.DEBUG,
+ "info": logging.INFO,
+ "warning": logging.WARNING,
+ "error": logging.ERROR,
+ "critical": logging.CRITICAL}
logging.basicConfig(level=mapping[level],
format='%(asctime)s.%(msecs)03d %(levelname)8s %(name)20s: %(message)s',
datefmt='%H:%M:%S')
logger.debug("logging at level %s", logging.getLevelName(level))
logger.info("using Python %i.%i.%i %s %i", *sys.version_info)
+
def _init_log_option(parser):
choices = ["none", "debug", "info", "warning", "error", "critical"]
parser.add_option("--log-level", "-l",
help=_("Enable logging, possible values: ") + ", ".join(choices))
return parser
+
def main(main_function, option_parser, gettext_domain=None, paths=None):
# FIXME:
def gettext_cache_access(s):
- if not s in d:
+ if s not in d:
d[s] = gettext(s)
return d[s]
setattr(self, name, dir)
+
XDG = _XDGClass()
secs, subsecs = s.split(".")
return int((int(h) * 60 ** 2 + int(m) * 60) * SECOND) + \
- int(secs) * SECOND + int(subsecs)
+ int(secs) * SECOND + int(subsecs)
class DebugLevel (int):
return DebugLevel(self - 1)
+
debug_level_none = DebugLevel("NONE")
debug_level_error = DebugLevel("ERROR")
debug_level_warning = DebugLevel("WARN")
"I": debug_level_info, "W": debug_level_warning,
"E": debug_level_error, " ": debug_level_none}
ANSI = "(?:\x1b\\[[0-9;]*m)?"
- ANSI_PATTERN = (r"\d:\d\d:\d\d\.\d+ " + ANSI +
- r" *\d+" + ANSI +
- r" +0x[0-9a-f]+ +" + ANSI +
- r"([TFLDIEW ])")
+ ANSI_PATTERN = r"\d:\d\d:\d\d\.\d+ " + ANSI + \
+ r" *\d+" + ANSI + \
+ r" +0x[0-9a-f]+ +" + ANSI + \
+ r"([TFLDIEW ])"
BARE_PATTERN = ANSI_PATTERN.replace(ANSI, "")
rexp_bare = re.compile(BARE_PATTERN)
rexp_ansi = re.compile(ANSI_PATTERN)
break
match = rexp_match(line)
if match is None:
- if rexp is rexp_ansi or not "\x1b" in line:
+ if rexp is rexp_ansi or "\x1b" not in line:
continue
match = rexp_ansi.match(line)
def __iter__(self):
- l = len(self)
+ size = len(self)
i = 0
- while i < l:
+ while i < size:
yield self[i]
i += 1
app.run()
+
if __name__ == "__main__":
main()
"""GStreamer Debug Viewer GUI module."""
-
-def _(s):
- return s
-
import logging
from gi.repository import Gtk, GLib
from GstDebugViewer.GUI.colors import LevelColorThemeTango
from GstDebugViewer.GUI.models import LazyLogModel, LogModelBase
+
+def _(s):
+ return s
+
# Sync with gst-inspector!
before = self.column_order[:pos]
shown_names = [col.name for col in self.columns]
for col_class in before:
- if not col_class.name in shown_names:
+ if col_class.name not in shown_names:
pos -= 1
return pos
col_id = LogModelBase.COL_LEVEL
if mode == self.this_and_above:
- comparison_function = lambda x, y: x < y
+ def comparison_function(x, y):
+ return x < y
else:
comparison_function = get_comparison_function(
mode == self.all_but_this)
# adjust special rows
row[COL_LEVEL] = line_levels[i]
msg_offset = row[COL_MESSAGE]
- row[COL_MESSAGE] = access_offset(offset + msg_offset)
+ row[COL_MESSAGE] = access_offset(offset + msg_offset)
yield (row, offset,)
row[COL_MESSAGE] = msg_offset
class SubRange (object):
- __slots__ = ("l", "start", "stop",)
+ __slots__ = ("size", "start", "stop",)
- def __init__(self, l, start, stop):
+ def __init__(self, size, start, stop):
if start > stop:
raise ValueError(
"need start <= stop (got %r, %r)" % (start, stop,))
- if type(l) == type(self):
+ if type(size) == type(self):
# Another SubRange, don't stack:
- start += l.start
- stop += l.start
- l = l.l
+ start += size.start
+ stop += size.start
+ size = size.size
- self.l = l
+ self.size = size
self.start = start
self.stop = stop
else:
stop += self.stop
- return self.l[i.start + self.start:stop]
+ return self.size[i.start + self.start:stop]
else:
- return self.l[i + self.start]
+ return self.size[i + self.start]
def __len__(self):
def __iter__(self):
- l = self.l
+ size = self.size
for i in range(self.start, self.stop):
- yield l[i]
+ yield size[i]
class LineViewLogModel (FilteredLogModelBase):
"""GStreamer Debug Viewer GUI module."""
-ZOOM_FACTOR = 1.15
-
-
-def _(s):
- return s
-
import os.path
from bisect import bisect_right, bisect_left
import logging
LogModelBase)
+ZOOM_FACTOR = 1.15
+
+
+def _(s):
+ return s
+
+
def action(func):
func.is_action_handler = True
group = Gtk.ActionGroup("MenuActions")
group.add_actions([("AppMenuAction", None, _("_Application")),
- ("ViewMenuAction", None, _("_View")),
- ("ViewColumnsMenuAction", None, _("_Columns")),
- ("HelpMenuAction", None, _("_Help")),
- ("LineViewContextMenuAction", None, "")])
+ ("ViewMenuAction", None, _("_View")),
+ ("ViewColumnsMenuAction", None, _("_Columns")),
+ ("HelpMenuAction", None, _("_Help")),
+ ("LineViewContextMenuAction", None, "")])
self.actions.add_group(group)
group = Gtk.ActionGroup("WindowActions")
dialog = Gtk.FileChooserDialog(None, self.gtk_window,
Gtk.FileChooserAction.OPEN,
- (Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
- Gtk.STOCK_OPEN, Gtk.ResponseType.ACCEPT,))
+ (Gtk.STOCK_CANCEL, Gtk.ResponseType.CANCEL,
+ Gtk.STOCK_OPEN, Gtk.ResponseType.ACCEPT,))
response = dialog.run()
dialog.hide()
if response == Gtk.ResponseType.ACCEPT:
def handle_edit_copy_message_action_activate(self, action):
col_id = LogModelBase.COL_MESSAGE
- self.clipboard.set_text(self.get_active_line()[col_id].decode('utf8'), -1)
+ self.clipboard.set_text(self.get_active_line()[
+ col_id].decode('utf8'), -1)
@action
def handle_enlarge_text_action_activate(self, action):
print("GStreamer Debug Viewer %s" % (version,))
sys.exit(0)
+
class Paths (Common.Main.PathsProgramBase):
program_name = "gst-debug-viewer"
self.action_group.add_actions([("goto-next-search-result",
None, _("Goto Next Match"),
"<Ctrl>G"),
- ("goto-previous-search-result",
- None, _("Goto Previous Match"),
- "<Ctrl><Shift>G")])
+ ("goto-previous-search-result",
+ None, _("Goto Previous Match"),
+ "<Ctrl><Shift>G")])
self.bar = None
self.operation = None
def handle_match_found(self, model, tree_iter):
- if not self.search_state in ("search-forward", "search-backward",):
+ if self.search_state not in ("search-forward", "search-backward",):
self.logger.warning(
"inconsistent search state %r", self.search_state)
return
#!/usr/bin/env python
-def line_string (ts, pid, thread, level, category, filename, line, function,
- object_, message):
+
+def line_string(ts, pid, thread, level, category, filename, line, function,
+ object_, message):
# Replicates gstreamer/gst/gstinfo.c:gst_debug_log_default.
# FIXME: Regarding object_, this doesn't fully replicate the formatting!
- return "%s %5d 0x%x %s %20s %s:%d:%s:<%s> %s" % (Data.time_args (ts), pid, thread,
- level.name.ljust (5), category,
+ return "%s %5d 0x%x %s %20s %s:%d:%s:<%s> %s" % (Data.time_args(ts), pid, thread,
+ level.name.ljust(
+ 5), category,
filename, line, function,
object_, message,)
-def main ():
+
+def main():
import sys
import os.path
- sys.path.append (os.path.dirname (os.path.dirname (sys.argv[0])))
+ sys.path.append(os.path.dirname(os.path.dirname(sys.argv[0])))
global Data
from GstDebugViewer import Data
ts = 0
pid = 12345
- thread = int ("89abcdef", 16)
+ thread = int("89abcdef", 16)
level = Data.debug_level_log
category = "GST_DUMMY"
filename = "gstdummyfilename.c"
Data.debug_level_info,)
shift = 0
- for i in range (count):
+ for i in range(count):
ts = i * 10000
shift += i % (count // 100)
level = levels[(i + shift) % 3]
- print(line_string (ts, pid, thread, level, category, filename, file_line,
- function, object_, message))
+ print(line_string(ts, pid, thread, level, category, filename, file_line,
+ function, object_, message))
+
if __name__ == "__main__":
- main ()
+ main()
from gi.repository import GObject
-sys.path.insert (0, os.path.join (sys.path[0], os.pardir))
+from .. import Common, Data, GUI
-from GstDebugViewer import Common, Data, GUI
class TestParsingPerformance (object):
- def __init__ (self, filename):
+ def __init__(self, filename):
- self.main_loop = GObject.MainLoop ()
- self.log_file = Data.LogFile (filename, Common.Data.DefaultDispatcher ())
- self.log_file.consumers.append (self)
+ self.main_loop = GObject.MainLoop()
+ self.log_file = Data.LogFile(filename, Common.Data.DefaultDispatcher())
+ self.log_file.consumers.append(self)
- def start (self):
+ def start(self):
- self.log_file.start_loading ()
+ self.log_file.start_loading()
- def handle_load_started (self):
+ def handle_load_started(self):
- self.start_time = time.time ()
+ self.start_time = time.time()
- def handle_load_finished (self):
+ def handle_load_finished(self):
- diff = time.time () - self.start_time
+ diff = time.time() - self.start_time
print("line cache built in %0.1f ms" % (diff * 1000.,))
- start_time = time.time ()
- model = GUI.LazyLogModel (self.log_file)
+ start_time = time.time()
+ model = GUI.LazyLogModel(self.log_file)
for row in model:
pass
- diff = time.time () - start_time
+ diff = time.time() - start_time
print("model iterated in %0.1f ms" % (diff * 1000.,))
- print("overall time spent: %0.1f s" % (time.time () - self.start_time,))
+ print("overall time spent: %0.1f s" % (time.time() - self.start_time,))
import resource
- rusage = resource.getrusage (resource.RUSAGE_SELF)
+ rusage = resource.getrusage(resource.RUSAGE_SELF)
print("time spent in user mode: %.2f s" % (rusage.ru_utime,))
print("time spent in system mode: %.2f s" % (rusage.ru_stime,))
-def main ():
- if len (sys.argv) > 1:
- test = TestParsingPerformance (sys.argv[1])
- test.start ()
+def main():
+
+ if len(sys.argv) > 1:
+ test = TestParsingPerformance(sys.argv[1])
+ test.start()
+
if __name__ == "__main__":
- main ()
+ main()
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8; mode: python; -*-
+#
+# GStreamer Debug Viewer - View and analyze GStreamer debug log files
+#
+# Copyright (C) 2007 René Stadler <mail@renestadler.de>
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the Free
+# Software Foundation; either version 3 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+# more details.
+#
+# You should have received a copy of the GNU General Public License along with
+# this program. If not, see <http://www.gnu.org/licenses/>.
+
+"""GStreamer Debug Viewer test suite for the custom tree models."""
+
+import sys
+import os
+import os.path
+from glob import glob
+
+from unittest import TestCase, main as test_main
+
+from .. import Common, Data
+from .. GUI.filters import CategoryFilter, Filter
+from .. GUI.models import (FilteredLogModel,
+ LogModelBase,
+ SubRange,)
+
+
+class TestSubRange (TestCase):
+
+ def test_len(self):
+
+ values = list(range(20))
+
+ sr = SubRange(values, 0, 20)
+ self.assertEqual(len(sr), 20)
+
+ sr = SubRange(values, 10, 20)
+ self.assertEqual(len(sr), 10)
+
+ sr = SubRange(values, 0, 10)
+ self.assertEqual(len(sr), 10)
+
+ sr = SubRange(values, 5, 15)
+ self.assertEqual(len(sr), 10)
+
+ def test_iter(self):
+
+ values = list(range(20))
+
+ sr = SubRange(values, 0, 20)
+ self.assertEqual(list(sr), values)
+
+ sr = SubRange(values, 10, 20)
+ self.assertEqual(list(sr), list(range(10, 20)))
+
+ sr = SubRange(values, 0, 10)
+ self.assertEqual(list(sr), list(range(0, 10)))
+
+ sr = SubRange(values, 5, 15)
+ self.assertEqual(list(sr), list(range(5, 15)))
+
+
+class Model (LogModelBase):
+
+ def __init__(self):
+
+ LogModelBase.__init__(self)
+
+ for i in range(20):
+ self.line_offsets.append(i * 100)
+ self.line_levels.append(Data.debug_level_debug)
+
+ def ensure_cached(self, line_offset):
+
+ pid = line_offset // 100
+ if pid % 2 == 0:
+ category = b"EVEN"
+ else:
+ category = b"ODD"
+
+ line_fmt = (b"0:00:00.000000000 %5i 0x0000000 DEBUG "
+ b"%20s dummy.c:1:dummy: dummy")
+ line_str = line_fmt % (pid, category,)
+ log_line = Data.LogLine.parse_full(line_str)
+ self.line_cache[line_offset] = log_line
+
+ def access_offset(self, line_offset):
+
+ return ""
+
+
+class IdentityFilter (Filter):
+
+ def __init__(self):
+
+ def filter_func(row):
+ return True
+ self.filter_func = filter_func
+
+
+class RandomFilter (Filter):
+
+ def __init__(self, seed):
+
+ import random
+ rand = random.Random()
+ rand.seed(seed)
+
+ def filter_func(row):
+ return rand.choice((True, False,))
+ self.filter_func = filter_func
+
+
+class TestDynamicFilter (TestCase):
+
+ def test_unset_filter_rerange(self):
+
+ full_model = Model()
+ filtered_model = FilteredLogModel(full_model)
+ row_list = self.__row_list
+
+ self.assertEqual(row_list(full_model), list(range(20)))
+ self.assertEqual(row_list(filtered_model), list(range(20)))
+
+ filtered_model.set_range(5, 16)
+
+ self.assertEqual(row_list(filtered_model), list(range(5, 16)))
+
+ def test_identity_filter_rerange(self):
+
+ full_model = Model()
+ filtered_model = FilteredLogModel(full_model)
+ row_list = self.__row_list
+
+ self.assertEqual(row_list(full_model), list(range(20)))
+ self.assertEqual(row_list(filtered_model), list(range(20)))
+
+ filtered_model.add_filter(IdentityFilter(),
+ Common.Data.DefaultDispatcher())
+ filtered_model.set_range(5, 16)
+
+ self.assertEqual(row_list(filtered_model), list(range(5, 16)))
+
+ def test_filtered_range_refilter_skip(self):
+
+ full_model = Model()
+ filtered_model = FilteredLogModel(full_model)
+
+ row_list = self.__row_list
+
+ filtered_model.add_filter(CategoryFilter("EVEN"),
+ Common.Data.DefaultDispatcher())
+ self.__dump_model(filtered_model, "filtered")
+
+ self.assertEqual(row_list(filtered_model), list(range(1, 20, 2)))
+ self.assertEqual([filtered_model.line_index_from_super(i)
+ for i in range(1, 20, 2)],
+ list(range(10)))
+ self.assertEqual([filtered_model.line_index_to_super(i)
+ for i in range(10)],
+ list(range(1, 20, 2)))
+
+ filtered_model.set_range(1, 20)
+ self.__dump_model(filtered_model, "ranged (1, 20)")
+ self.__dump_model(filtered_model, "filtered range")
+
+ self.assertEqual([filtered_model.line_index_from_super(i)
+ for i in range(0, 19, 2)],
+ list(range(10)))
+ self.assertEqual([filtered_model.line_index_to_super(i)
+ for i in range(10)],
+ list(range(1, 20, 2)))
+
+ filtered_model.set_range(2, 20)
+ self.__dump_model(filtered_model, "ranged (2, 20)")
+
+ self.assertEqual(row_list(filtered_model), list(range(3, 20, 2)))
+
+ def test_filtered_range_refilter(self):
+
+ full_model = Model()
+ filtered_model = FilteredLogModel(full_model)
+
+ row_list = self.__row_list
+ rows = row_list(full_model)
+ rows_filtered = row_list(filtered_model)
+
+ self.__dump_model(full_model, "full model")
+
+ self.assertEqual(rows, rows_filtered)
+
+ self.assertEqual([filtered_model.line_index_from_super(i)
+ for i in range(20)],
+ list(range(20)))
+ self.assertEqual([filtered_model.line_index_to_super(i)
+ for i in range(20)],
+ list(range(20)))
+
+ filtered_model.set_range(5, 16)
+ self.__dump_model(filtered_model, "ranged model (5, 16)")
+
+ rows_ranged = row_list(filtered_model)
+ self.assertEqual(rows_ranged, list(range(5, 16)))
+
+ self.__dump_model(filtered_model, "filtered model (nofilter, 5, 15)")
+
+ filtered_model.add_filter(CategoryFilter("EVEN"),
+ Common.Data.DefaultDispatcher())
+ rows_filtered = row_list(filtered_model)
+ self.assertEqual(rows_filtered, list(range(5, 16, 2)))
+
+ self.__dump_model(filtered_model, "filtered model")
+
+ def test_random_filtered_range_refilter(self):
+
+ full_model = Model()
+ filtered_model = FilteredLogModel(full_model)
+ row_list = self.__row_list
+
+ self.assertEqual(row_list(full_model), list(range(20)))
+ self.assertEqual(row_list(filtered_model), list(range(20)))
+
+ filtered_model.add_filter(RandomFilter(538295943),
+ Common.Data.DefaultDispatcher())
+ random_rows = row_list(filtered_model)
+
+ self.__dump_model(filtered_model)
+
+ filtered_model = FilteredLogModel(full_model)
+ filtered_model.add_filter(RandomFilter(538295943),
+ Common.Data.DefaultDispatcher())
+ self.__dump_model(filtered_model, "filtered model")
+ self.assertEqual(row_list(filtered_model), random_rows)
+
+ filtered_model.set_range(1, 10)
+ self.__dump_model(filtered_model)
+ self.assertEqual(row_list(filtered_model), [
+ x for x in range(0, 10) if x in random_rows])
+
+ def __row_list(self, model):
+
+ return [row[Model.COL_PID] for row in model]
+
+ def __dump_model(self, model, comment=None):
+
+ # TODO: Provide a command line option to turn this on and off.
+
+ return
+
+ if not hasattr(model, "super_model"):
+ # Top model.
+ print("\t(%s)" % ("|".join([str(i).rjust(2)
+ for i in self.__row_list(model)]),), end=' ')
+ else:
+ top_model = model.super_model
+ if hasattr(top_model, "super_model"):
+ top_model = top_model.super_model
+ top_indices = self.__row_list(top_model)
+ positions = self.__row_list(model)
+ output = [" "] * len(top_indices)
+ for i, position in enumerate(positions):
+ output[position] = str(i).rjust(2)
+ print("\t(%s)" % ("|".join(output),), end=' ')
+
+ if comment is None:
+ print()
+ else:
+ print(comment)
+
+
+if __name__ == "__main__":
+ test_main()
import os.path
import distutils.cmd
-from distutils.core import setup
+from setuptools import setup
from distutils.command.clean import clean
from distutils.command.build import build
from distutils.command.sdist import sdist
from distutils.command.install_scripts import install_scripts
from distutils.errors import *
-def perform_substitution (filename, values):
- fp = file (filename, "rt")
- data = fp.read ()
- fp.close ()
+def perform_substitution(filename, values):
- for name, value in list(values.items ()):
- data = data.replace ("$%s$" % (name,), value)
+ fp = file(filename, "rt")
+ data = fp.read()
+ fp.close()
- fp = file (filename, "wt")
- fp.write (data)
- fp.close ()
+ for name, value in list(values.items()):
+ data = data.replace("$%s$" % (name,), value)
-class tests (distutils.cmd.Command):
+ fp = file(filename, "wt")
+ fp.write(data)
+ fp.close()
- description = "run unit tests"
-
- user_options = [("files=", "f", "test scripts",)]
-
- def initialize_options (self):
-
- self.files = []
-
- def finalize_options (self):
-
- from glob import glob
-
- if self.files:
- self.files = glob (os.path.join (*self.files.split ("/")))
- else:
- self.files = []
-
- def run (self):
-
- for filename in self.files:
- self.spawn ([sys.executable, filename])
class clean_custom (clean):
- def remove_file (self, path):
+ def remove_file(self, path):
- if os.path.exists (path):
+ if os.path.exists(path):
print("removing '%s'" % (path,))
if not self.dry_run:
- os.unlink (path)
+ os.unlink(path)
- def remove_directory (self, path):
+ def remove_directory(self, path):
from distutils import dir_util
- if os.path.exists (path):
- dir_util.remove_tree (path, dry_run = self.dry_run)
+ if os.path.exists(path):
+ dir_util.remove_tree(path, dry_run=self.dry_run)
- def run (self):
+ def run(self):
- clean.run (self)
+ clean.run(self)
- if os.path.exists ("MANIFEST.in"):
+ if os.path.exists("MANIFEST.in"):
# MANIFEST is generated, get rid of it.
- self.remove_file ("MANIFEST")
+ self.remove_file("MANIFEST")
- pot_file = os.path.join ("po", "gst-debug-viewer.pot")
- self.remove_file (pot_file)
+ pot_file = os.path.join("po", "gst-debug-viewer.pot")
+ self.remove_file(pot_file)
- self.remove_directory ("build")
- self.remove_directory ("dist")
+ self.remove_directory("build")
+ self.remove_directory("dist")
- for path, dirs, files in os.walk ("."):
+ for path, dirs, files in os.walk("."):
for filename in files:
- if filename.endswith (".pyc") or filename.endswith (".pyo"):
- file_path = os.path.join (path, filename)
- self.remove_file (file_path)
+ if filename.endswith(".pyc") or filename.endswith(".pyo"):
+ file_path = os.path.join(path, filename)
+ self.remove_file(file_path)
+
class build_custom (build):
- def build_l10n (self):
+ def build_l10n(self):
return self.l10n
sub_commands = build.sub_commands + [("build_l10n", build_l10n,)]
- user_options = build.user_options + [("l10n", None, "enable translations",)]
+ user_options = build.user_options + \
+ [("l10n", None, "enable translations",)]
boolean_options = build.boolean_options + ["l10n"]
- def initialize_options (self):
+ def initialize_options(self):
- build.initialize_options (self)
+ build.initialize_options(self)
self.l10n = False
+
class build_l10n (distutils.cmd.Command):
# Based on code from python-distutils-extra by Sebastian Heinlein.
("domain=", "d", "gettext domain"),
("bug-contact=", "c", "contact address for msgid bugs")]
- def initialize_options (self):
+ def initialize_options(self):
self.merge_desktop_files = []
self.merge_xml_files = []
self.domain = None
self.bug_contact = None
- def finalize_options (self):
+ def finalize_options(self):
for attr in ("desktop", "xml", "key", "schemas", "rfc822deb",):
- value = getattr (self, "merge_%s_files" % (attr,))
+ value = getattr(self, "merge_%s_files" % (attr,))
if not value:
value = []
else:
- value = eval (value)
- setattr (self, "merge_%s_files" % (attr,), value)
+ value = eval(value)
+ setattr(self, "merge_%s_files" % (attr,), value)
if self.domain is None:
self.domain = self.distribution.metadata.name
- def run (self):
+ def run(self):
from glob import glob
data_files = self.distribution.data_files
- po_makefile = os.path.join ("po", "Makefile")
- if os.path.exists (po_makefile):
- raise DistutilsFileError ("file %s exists (intltool will pick up "
- "values from there)" % (po_makefile,))
+ po_makefile = os.path.join("po", "Makefile")
+ if os.path.exists(po_makefile):
+ raise DistutilsFileError("file %s exists (intltool will pick up "
+ "values from there)" % (po_makefile,))
- cwd = os.getcwd ()
+ cwd = os.getcwd()
if self.bug_contact is not None:
- os.environ["XGETTEXT_ARGS"] = "--msgid-bugs-address=%s" % (self.bug_contact,)
- os.chdir (os.path.join (cwd, "po"))
+ os.environ["XGETTEXT_ARGS"] = "--msgid-bugs-address=%s" % (
+ self.bug_contact,)
+ os.chdir(os.path.join(cwd, "po"))
# Update .pot file.
- self.spawn (["intltool-update", "-p", "-g", self.domain])
+ self.spawn(["intltool-update", "-p", "-g", self.domain])
# Merge new strings into .po files.
- self.spawn (["intltool-update", "-r", "-g", self.domain])
+ self.spawn(["intltool-update", "-r", "-g", self.domain])
- os.chdir (cwd)
+ os.chdir(cwd)
- for po_file in glob (os.path.join ("po", "*.po")):
- lang = os.path.basename (po_file[:-3])
- if lang.startswith ("."):
+ for po_file in glob(os.path.join("po", "*.po")):
+ lang = os.path.basename(po_file[:-3])
+ if lang.startswith("."):
# Hidden file, like auto-save data from an editor.
continue
- mo_dir = os.path.join ("build", "mo", lang, "LC_MESSAGES")
- mo_file = os.path.join (mo_dir, "%s.mo" % (self.domain,))
- self.mkpath (mo_dir)
- self.spawn (["msgfmt", po_file, "-o", mo_file])
+ mo_dir = os.path.join("build", "mo", lang, "LC_MESSAGES")
+ mo_file = os.path.join(mo_dir, "%s.mo" % (self.domain,))
+ self.mkpath(mo_dir)
+ self.spawn(["msgfmt", po_file, "-o", mo_file])
- targetpath = os.path.join ("share", "locale", lang, "LC_MESSAGES")
- data_files.append ((targetpath, (mo_file,)))
+ targetpath = os.path.join("share", "locale", lang, "LC_MESSAGES")
+ data_files.append((targetpath, (mo_file,)))
for parameter, option in ((self.merge_xml_files, "-x",),
(self.merge_desktop_files, "-d",),
if not parameter:
continue
for target, files in parameter:
- build_target = os.path.join ("build", target)
+ build_target = os.path.join("build", target)
for file in files:
- if file.endswith (".in"):
- file_merged = os.path.basename (file[:-3])
+ if file.endswith(".in"):
+ file_merged = os.path.basename(file[:-3])
else:
- file_merged = os.path.basename (file)
+ file_merged = os.path.basename(file)
+
+ self.mkpath(build_target)
+ file_merged = os.path.join(build_target, file_merged)
+ self.spawn(["intltool-merge", option, "po", file, file_merged])
+ data_files.append((target, [file_merged],))
- self.mkpath (build_target)
- file_merged = os.path.join (build_target, file_merged)
- self.spawn (["intltool-merge", option, "po", file, file_merged])
- data_files.append ((target, [file_merged],))
class distcheck (sdist):
description = "verify self-containedness of source distribution"
- def run (self):
+ def run(self):
from distutils import dir_util
from distutils.spawn import spawn
# This creates e.g. dist/gst-debug-viewer-0.1.tar.gz.
- sdist.run (self)
+ sdist.run(self)
- base_dir = self.distribution.get_fullname ()
- distcheck_dir = os.path.join (self.dist_dir, "distcheck")
- self.mkpath (distcheck_dir)
- self.mkpath (os.path.join (distcheck_dir, "again"))
+ base_dir = self.distribution.get_fullname()
+ distcheck_dir = os.path.join(self.dist_dir, "distcheck")
+ self.mkpath(distcheck_dir)
+ self.mkpath(os.path.join(distcheck_dir, "again"))
- cwd = os.getcwd ()
- os.chdir (distcheck_dir)
+ cwd = os.getcwd()
+ os.chdir(distcheck_dir)
- if os.path.isdir (base_dir):
- dir_util.remove_tree (base_dir)
+ if os.path.isdir(base_dir):
+ dir_util.remove_tree(base_dir)
# Unpack tarball into dist/distcheck, creating
# e.g. dist/distcheck/gst-debug-viewer-0.1.
for archive in self.archive_files:
- if archive.endswith (".tar.gz"):
- archive_rel = os.path.join (os.pardir, os.pardir, archive)
- spawn (["tar", "-xzf", archive_rel, base_dir])
+ if archive.endswith(".tar.gz"):
+ archive_rel = os.path.join(os.pardir, os.pardir, archive)
+ spawn(["tar", "-xzf", archive_rel, base_dir])
break
else:
- raise ValueError ("no supported archives were created")
+ raise ValueError("no supported archives were created")
- os.chdir (cwd)
- os.chdir (os.path.join (distcheck_dir, base_dir))
- spawn ([sys.executable, "setup.py", "sdist", "--formats", "gztar"])
+ os.chdir(cwd)
+ os.chdir(os.path.join(distcheck_dir, base_dir))
+ spawn([sys.executable, "setup.py", "sdist", "--formats", "gztar"])
# Unpack tarball into dist/distcheck/again.
- os.chdir (cwd)
- os.chdir (os.path.join (distcheck_dir, "again"))
- archive_rel = os.path.join (os.pardir, base_dir, "dist", "%s.tar.gz" % (base_dir,))
- spawn (["tar", "-xzf", archive_rel, base_dir])
+ os.chdir(cwd)
+ os.chdir(os.path.join(distcheck_dir, "again"))
+ archive_rel = os.path.join(
+ os.pardir, base_dir, "dist", "%s.tar.gz" % (base_dir,))
+ spawn(["tar", "-xzf", archive_rel, base_dir])
- os.chdir (cwd)
- os.chdir (os.path.join (distcheck_dir, base_dir))
- spawn ([sys.executable, "setup.py", "clean"])
+ os.chdir(cwd)
+ os.chdir(os.path.join(distcheck_dir, base_dir))
+ spawn([sys.executable, "setup.py", "clean"])
- os.chdir (cwd)
- spawn (["diff", "-ru",
- os.path.join (distcheck_dir, base_dir),
- os.path.join (distcheck_dir, "again", base_dir)])
+ os.chdir(cwd)
+ spawn(["diff", "-ru",
+ os.path.join(distcheck_dir, base_dir),
+ os.path.join(distcheck_dir, "again", base_dir)])
if not self.keep_temp:
- dir_util.remove_tree (distcheck_dir)
+ dir_util.remove_tree(distcheck_dir)
+
class install_scripts_custom (install_scripts):
user_options = install_scripts.user_options \
- + [("substitute-files=", None,
- "files to perform substitution on")]
+ + [("substitute-files=", None,
+ "files to perform substitution on")]
- def initialize_options (self):
+ def initialize_options(self):
- install_scripts.initialize_options (self)
+ install_scripts.initialize_options(self)
self.substitute_files = "[]"
- def run (self):
+ def run(self):
from os.path import normpath
- install = self.distribution.get_command_obj ("install")
- install.ensure_finalized ()
+ install = self.distribution.get_command_obj("install")
+ install.ensure_finalized()
- values = {"DATADIR" : install.install_data or "",
- "PREFIX" : install.home or install.prefix or "",
- "SCRIPTSDIR" : self.install_dir or ""}
+ values = {"DATADIR": install.install_data or "",
+ "PREFIX": install.home or install.prefix or "",
+ "SCRIPTSDIR": self.install_dir or ""}
if install.home:
- values["LIBDIR"] = os.path.normpath (install.install_lib)
+ values["LIBDIR"] = os.path.normpath(install.install_lib)
if install.root:
- root = normpath (install.root)
- len_root = len (root)
- for name, value in list(values.items ()):
- if normpath (value).startswith (root):
- values[name] = normpath (value)[len_root:]
+ root = normpath(install.root)
+ len_root = len(root)
+ for name, value in list(values.items()):
+ if normpath(value).startswith(root):
+ values[name] = normpath(value)[len_root:]
# Perform installation as normal...
- install_scripts.run (self)
+ install_scripts.run(self)
if self.dry_run:
return
# ...then substitute in-place:
- for filename in eval (self.substitute_files):
- perform_substitution (os.path.join (self.install_dir, filename), values)
-
-cmdclass = {"build" : build_custom,
- "clean" : clean_custom,
- "install_scripts" : install_scripts_custom,
-
- "build_l10n" : build_l10n,
- "distcheck" : distcheck,
- "tests" : tests}
-
-setup (cmdclass = cmdclass,
-
- packages = ["GstDebugViewer",
- "GstDebugViewer.Common",
- "GstDebugViewer.GUI",
- "GstDebugViewer.Plugins"],
- scripts = ["gst-debug-viewer"],
- data_files = [("share/gst-debug-viewer", ["data/about-dialog.ui",
- "data/main-window.ui",
- "data/menus.ui"],),
- ("share/icons/hicolor/48x48/apps", ["data/gst-debug-viewer.png"],),
- ("share/icons/hicolor/scalable/apps", ["data/gst-debug-viewer.svg"],)],
-
- name = "gst-debug-viewer",
- version = "0.1",
- description = "GStreamer Debug Viewer",
- long_description = """""",
- license = "GNU GPL",
- author = "Rene Stadler",
- author_email = "mail@renestadler.de",
- url = "http://renestadler.de/projects/gst-debug-viewer")
+ for filename in eval(self.substitute_files):
+ perform_substitution(os.path.join(
+ self.install_dir, filename), values)
+
+
+cmdclass = {"build": build_custom,
+ "clean": clean_custom,
+ "install_scripts": install_scripts_custom,
+
+ "build_l10n": build_l10n,
+ "distcheck": distcheck}
+
+setup(cmdclass=cmdclass,
+
+ packages=["GstDebugViewer",
+ "GstDebugViewer.Common",
+ "GstDebugViewer.GUI",
+ "GstDebugViewer.Plugins",
+ "GstDebugViewer.tests"],
+ scripts=["gst-debug-viewer"],
+ data_files=[("share/gst-debug-viewer", ["data/about-dialog.ui",
+ "data/main-window.ui",
+ "data/menus.ui"],),
+ ("share/icons/hicolor/48x48/apps",
+ ["data/gst-debug-viewer.png"],),
+ ("share/icons/hicolor/scalable/apps", ["data/gst-debug-viewer.svg"],)],
+
+ name="gst-debug-viewer",
+ version="0.1",
+ description="GStreamer Debug Viewer",
+ long_description="""""",
+ test_suite="GstDebugViewer.tests",
+ license="GNU GPL",
+ author="Rene Stadler",
+ author_email="mail@renestadler.de",
+ url="http://renestadler.de/projects/gst-debug-viewer")
+++ /dev/null
-#!/usr/bin/env python
-# -*- coding: utf-8; mode: python; -*-
-#
-# GStreamer Debug Viewer - View and analyze GStreamer debug log files
-#
-# Copyright (C) 2007 René Stadler <mail@renestadler.de>
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the Free
-# Software Foundation; either version 3 of the License, or (at your option)
-# any later version.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along with
-# this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""GStreamer Debug Viewer test suite for the custom tree models."""
-
-import sys
-import os
-import os.path
-from glob import glob
-
-sys.path.insert (0, os.path.join (sys.path[0], os.pardir))
-
-from unittest import TestCase, main as test_main
-
-from GstDebugViewer import Common, Data
-from GstDebugViewer.GUI.filters import CategoryFilter, Filter
-from GstDebugViewer.GUI.models import (FilteredLogModel,
- LogModelBase,
- SubRange,)
-
-class TestSubRange (TestCase):
-
- def test_len (self):
-
- l = list(range(20))
-
- sr = SubRange (l, 0, 20)
- self.assertEqual (len (sr), 20)
-
- sr = SubRange (l, 10, 20)
- self.assertEqual (len (sr), 10)
-
- sr = SubRange (l, 0, 10)
- self.assertEqual (len (sr), 10)
-
- sr = SubRange (l, 5, 15)
- self.assertEqual (len (sr), 10)
-
- def test_iter (self):
-
- l = list(range(20))
-
- sr = SubRange (l, 0, 20)
- self.assertEqual (list (sr), l)
-
- sr = SubRange (l, 10, 20)
- self.assertEqual (list (sr), list(range(10, 20)))
-
- sr = SubRange (l, 0, 10)
- self.assertEqual (list (sr), list(range(0, 10)))
-
- sr = SubRange (l, 5, 15)
- self.assertEqual (list (sr), list(range(5, 15)))
-
-class Model (LogModelBase):
-
- def __init__ (self):
-
- LogModelBase.__init__ (self)
-
- for i in range (20):
- self.line_offsets.append (i * 100)
- self.line_levels.append (Data.debug_level_debug)
-
- def ensure_cached (self, line_offset):
-
- pid = line_offset // 100
- if pid % 2 == 0:
- category = b"EVEN"
- else:
- category = b"ODD"
-
- line_fmt = (b"0:00:00.000000000 %5i 0x0000000 DEBUG "
- b"%20s dummy.c:1:dummy: dummy")
- line_str = line_fmt % (pid, category,)
- log_line = Data.LogLine.parse_full (line_str)
- self.line_cache[line_offset] = log_line
-
- def access_offset (self, line_offset):
-
- return ""
-
-class IdentityFilter (Filter):
-
- def __init__ (self):
-
- def filter_func (row):
- return True
- self.filter_func = filter_func
-
-class RandomFilter (Filter):
-
- def __init__ (self, seed):
-
- import random
- rand = random.Random ()
- rand.seed (seed)
- def filter_func (row):
- return rand.choice ((True, False,))
- self.filter_func = filter_func
-
-class TestDynamicFilter (TestCase):
-
- def test_unset_filter_rerange (self):
-
- full_model = Model ()
- filtered_model = FilteredLogModel (full_model)
- row_list = self.__row_list
-
- self.assertEqual (row_list (full_model), list(range(20)))
- self.assertEqual (row_list (filtered_model), list(range(20)))
-
- filtered_model.set_range (5, 16)
-
- self.assertEqual (row_list (filtered_model), list(range(5, 16)))
-
- def test_identity_filter_rerange (self):
-
- full_model = Model ()
- filtered_model = FilteredLogModel (full_model)
- row_list = self.__row_list
-
- self.assertEqual (row_list (full_model), list(range(20)))
- self.assertEqual (row_list (filtered_model), list(range(20)))
-
- filtered_model.add_filter (IdentityFilter (),
- Common.Data.DefaultDispatcher ())
- filtered_model.set_range (5, 16)
-
- self.assertEqual (row_list (filtered_model), list(range(5, 16)))
-
- def test_filtered_range_refilter_skip (self):
-
- full_model = Model ()
- filtered_model = FilteredLogModel (full_model)
-
- row_list = self.__row_list
-
- filtered_model.add_filter (CategoryFilter ("EVEN"),
- Common.Data.DefaultDispatcher ())
- self.__dump_model (filtered_model, "filtered")
-
- self.assertEqual (row_list (filtered_model), list(range(1, 20, 2)))
- self.assertEqual ([filtered_model.line_index_from_super (i)
- for i in range (1, 20, 2)],
- list(range(10)))
- self.assertEqual ([filtered_model.line_index_to_super (i)
- for i in range (10)],
- list(range(1, 20, 2)))
-
- filtered_model.set_range (1, 20)
- self.__dump_model (filtered_model, "ranged (1, 20)")
- self.__dump_model (filtered_model, "filtered range")
-
- self.assertEqual ([filtered_model.line_index_from_super (i)
- for i in range (0, 19, 2)],
- list(range(10)))
- self.assertEqual ([filtered_model.line_index_to_super (i)
- for i in range (10)],
- list(range(1, 20, 2)))
-
- filtered_model.set_range (2, 20)
- self.__dump_model (filtered_model, "ranged (2, 20)")
-
- self.assertEqual (row_list (filtered_model), list(range(3, 20, 2)))
-
- def test_filtered_range_refilter (self):
-
- full_model = Model ()
- filtered_model = FilteredLogModel (full_model)
-
- row_list = self.__row_list
- rows = row_list (full_model)
- rows_filtered = row_list (filtered_model)
-
- self.__dump_model (full_model, "full model")
-
- self.assertEqual (rows, rows_filtered)
-
- self.assertEqual ([filtered_model.line_index_from_super (i)
- for i in range (20)],
- list(range(20)))
- self.assertEqual ([filtered_model.line_index_to_super (i)
- for i in range (20)],
- list(range(20)))
-
- filtered_model.set_range (5, 16)
- self.__dump_model (filtered_model, "ranged model (5, 16)")
-
- rows_ranged = row_list (filtered_model)
- self.assertEqual (rows_ranged, list(range(5, 16)))
-
- self.__dump_model (filtered_model, "filtered model (nofilter, 5, 15)")
-
- filtered_model.add_filter (CategoryFilter ("EVEN"),
- Common.Data.DefaultDispatcher ())
- rows_filtered = row_list (filtered_model)
- self.assertEqual (rows_filtered, list(range(5, 16, 2)))
-
- self.__dump_model (filtered_model, "filtered model")
-
- def test_random_filtered_range_refilter (self):
-
- full_model = Model ()
- filtered_model = FilteredLogModel (full_model)
- row_list = self.__row_list
-
- self.assertEqual (row_list (full_model), list(range(20)))
- self.assertEqual (row_list (filtered_model), list(range(20)))
-
- filtered_model.add_filter (RandomFilter (538295943),
- Common.Data.DefaultDispatcher ())
- random_rows = row_list (filtered_model)
-
- self.__dump_model (filtered_model)
-
- filtered_model = FilteredLogModel (full_model)
- filtered_model.add_filter (RandomFilter (538295943),
- Common.Data.DefaultDispatcher ())
- self.__dump_model (filtered_model, "filtered model")
- self.assertEqual (row_list (filtered_model), random_rows)
-
- filtered_model.set_range (1, 10)
- self.__dump_model (filtered_model)
- self.assertEqual (row_list (filtered_model), [x for x in range (0, 10) if x in random_rows])
-
- def __row_list (self, model):
-
- return [row[Model.COL_PID] for row in model]
-
- def __dump_model (self, model, comment = None):
-
- # TODO: Provide a command line option to turn this on and off.
-
- return
-
- if not hasattr (model, "super_model"):
- # Top model.
- print("\t(%s)" % ("|".join ([str (i).rjust (2) for i in self.__row_list (model)]),), end=' ')
- else:
- top_model = model.super_model
- if hasattr (top_model, "super_model"):
- top_model = top_model.super_model
- top_indices = self.__row_list (top_model)
- positions = self.__row_list (model)
- output = [" "] * len (top_indices)
- for i, position in enumerate (positions):
- output[position] = str (i).rjust (2)
- print("\t(%s)" % ("|".join (output),), end=' ')
-
- if comment is None:
- print()
- else:
- print(comment)
-
-if __name__ == "__main__":
- test_main ()
try:
if not modified_file.endswith(".py"):
continue
- pycodestyle_errors = system('pycodestyle', '--repeat', '--ignore', 'E501,E128', modified_file)
+ pycodestyle_errors = system('pycodestyle', '--repeat', '--ignore', 'E501,E128,W605', modified_file)
if pycodestyle_errors:
if output_message is None:
output_message = NOT_PYCODESTYLE_COMPLIANT_MESSAGE_PRE