import random
import string
import time
-import fcntl
-import struct
-import termios
from errors import *
from mic import msger
import runner
-def terminal_width(fd=1):
- """ Get the real terminal width """
- try:
- buf = 'abcdefgh'
- buf = fcntl.ioctl(fd, termios.TIOCGWINSZ, buf)
- return struct.unpack('hhhh', buf)[1]
- except: # IOError
- return 80
-
-def truncate_url(url, width):
- return os.path.basename(url)[0:width]
-
-class TextProgress(object):
- # make the class as singleton
- _instance = None
- def __new__(cls, *args, **kwargs):
- if not cls._instance:
- cls._instance = super(TextProgress, cls).__new__(cls, *args, **kwargs)
-
- return cls._instance
-
- def __init__(self, totalnum = None):
- self.total = totalnum
- self.counter = 1
-
- def start(self, filename, url, *args, **kwargs):
- self.url = url
- self.termwidth = terminal_width()
- msger.info("\r%-*s" % (self.termwidth, " "))
- if self.total is None:
- msger.info("\rRetrieving %s ..." % truncate_url(self.url, self.termwidth - 15))
- else:
- msger.info("\rRetrieving %s [%d/%d] ..." % (truncate_url(self.url, self.termwidth - 25), self.counter, self.total))
-
- def update(self, *args):
- pass
-
- def end(self, *args):
- if self.counter == self.total:
- msger.raw("\n")
-
- if self.total is not None:
- self.counter += 1
-
def find_binary_path(binary):
if os.environ.has_key("PATH"):
paths = os.environ["PATH"].split(":")
msger.info("Loading %s..." % module)
runner.quiet(['modprobe', module])
-def myurlgrab(url, filename, proxies, progress_obj = None):
- from pykickstart.urlgrabber.grabber import URLGrabber, URLGrabError
-
- g = URLGrabber()
- if progress_obj is None:
- progress_obj = TextProgress()
-
- if url.startswith("file:/"):
- file = url.replace("file:", "")
- if not os.path.exists(file):
- raise CreatorError("URLGrabber error: can't find file %s" % file)
- runner.show(['cp', "-f", file, filename])
- else:
- try:
- filename = g.urlgrab(url = url, filename = filename,
- ssl_verify_host = False, ssl_verify_peer = False,
- proxies = proxies, http_headers = (('Pragma', 'no-cache'),), progress_obj = progress_obj)
- except URLGrabError, e:
- raise CreatorError("URLGrabber error: %s" % url)
-
- return filename
-
def get_loop_device(losetupcmd, lofile):
""" Get a lock to synchronize getting a loopback device """
from errors import *
from fs_related import *
+from rpmmisc import myurlgrab
from proxy import get_proxy_for
import runner
# Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import os, sys, re
+import fcntl
+import struct
+import termios
import rpm
from mic import msger
import runner
+def myurlgrab(url, filename, proxies, progress_obj = None):
+ from pykickstart.urlgrabber.grabber import URLGrabber, URLGrabError
+
+ g = URLGrabber()
+ if progress_obj is None:
+ progress_obj = TextProgress()
+
+ if url.startswith("file:/"):
+ file = url.replace("file:", "")
+ if not os.path.exists(file):
+ raise CreatorError("URLGrabber error: can't find file %s" % file)
+ runner.show(['cp', "-f", file, filename])
+ else:
+ try:
+ filename = g.urlgrab(url = url, filename = filename,
+ ssl_verify_host = False, ssl_verify_peer = False,
+ proxies = proxies, http_headers = (('Pragma', 'no-cache'),), progress_obj = progress_obj)
+ except URLGrabError, e:
+ raise CreatorError("URLGrabber error: %s" % url)
+
+ return filename
+
+def terminal_width(fd=1):
+ """ Get the real terminal width """
+ try:
+ buf = 'abcdefgh'
+ buf = fcntl.ioctl(fd, termios.TIOCGWINSZ, buf)
+ return struct.unpack('hhhh', buf)[1]
+ except: # IOError
+ return 80
+
+def truncate_url(url, width):
+ return os.path.basename(url)[0:width]
+
+class TextProgress(object):
+ # make the class as singleton
+ _instance = None
+ def __new__(cls, *args, **kwargs):
+ if not cls._instance:
+ cls._instance = super(TextProgress, cls).__new__(cls, *args, **kwargs)
+
+ return cls._instance
+
+ def __init__(self, totalnum = None):
+ self.total = totalnum
+ self.counter = 1
+
+ def start(self, filename, url, *args, **kwargs):
+ self.url = url
+ self.termwidth = terminal_width()
+ msger.info("\r%-*s" % (self.termwidth, " "))
+ if self.total is None:
+ msger.info("\rRetrieving %s ..." % truncate_url(self.url, self.termwidth - 15))
+ else:
+ msger.info("\rRetrieving %s [%d/%d] ..." % (truncate_url(self.url, self.termwidth - 25), self.counter, self.total))
+
+ def update(self, *args):
+ pass
+
+ def end(self, *args):
+ if self.counter == self.total:
+ msger.raw("\n")
+
+ if self.total is not None:
+ self.counter += 1
+
class RPMInstallCallback:
""" Command line callback class for callbacks from the RPM library.
"""
from mic import msger
from mic.kickstart import ksparser
-from mic.utils import rpmmisc, fs_related as fs
+from mic.utils import rpmmisc
from mic.utils.errors import CreatorError
from mic.imager.baseimager import BaseImageCreator
try:
repos = self.repos.listEnabled()
for repo in repos:
- repo.setCallback(fs.TextProgress(total_count - cached_count))
+ repo.setCallback(rpmmisc.TextProgress(total_count - cached_count))
self.downloadPkgs(dlpkgs)
# FIXME: sigcheck?
from mic import msger
from mic.kickstart import ksparser
-from mic.utils import rpmmisc, fs_related as fs
+from mic.utils import rpmmisc
from mic.utils.proxy import get_proxy_for
from mic.utils.errors import CreatorError
from mic.imager.baseimager import BaseImageCreator
def downloadPkgs(self, package_objects, count):
localpkgs = self.localpkgs.keys()
- progress_obj = fs.TextProgress(count)
+ progress_obj = rpmmisc.TextProgress(count)
for po in package_objects:
if po.name() in localpkgs:
continue
location = location[2:]
url = baseurl + "/%s" % location
try:
- filename = fs.myurlgrab(url, filename, proxies, progress_obj)
+ filename = rpmmisc.myurlgrab(url, filename, proxies, progress_obj)
except CreatorError:
self.close()
raise