1 # Copyright 2014 Dirk Pranke. All rights reserved.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from typ.host import _TeedStream
23 is_python3 = bool(sys.version_info.major == 3)
25 if is_python3: # pragma: python3
26 # redefining built-in 'unicode' pylint: disable=W0622
30 class FakeHost(object):
31 # "too many instance attributes" pylint: disable=R0902
32 # "redefining built-in" pylint: disable=W0622
33 # "unused arg" pylint: disable=W0613
35 python_interpreter = 'python'
36 is_python3 = bool(sys.version_info.major == 3)
39 self.logger = logging.getLogger()
40 self.stdin = io.StringIO()
41 self.stdout = io.StringIO()
42 self.stderr = io.StringIO()
43 self.platform = 'linux2'
49 self.fetch_responses = {}
50 self.written_files = {}
51 self.last_tmpdir = None
52 self.current_tmpno = 0
56 self._orig_logging_handlers = []
58 def __getstate__(self):
59 d = copy.copy(self.__dict__)
64 del d['_orig_logging_handlers']
67 def __setstate__(self, d):
68 for k, v in d.items():
70 self.logger = logging.getLogger()
71 self.stdin = io.StringIO()
72 self.stdout = io.StringIO()
73 self.stderr = io.StringIO()
75 def abspath(self, *comps):
76 relpath = self.join(*comps)
77 if relpath.startswith('/'):
79 return self.join(self.cwd, relpath)
81 def add_to_path(self, *comps):
82 absolute_path = self.abspath(*comps)
83 if absolute_path not in sys.path:
84 sys.path.append(absolute_path)
86 def basename(self, path):
87 return path.split(self.sep)[-1]
89 def call(self, argv, stdin=None, env=None):
90 self.cmds.append(argv)
93 def call_inline(self, argv):
94 return self.call(argv)[0]
96 def chdir(self, *comps):
97 path = self.join(*comps)
98 if not path.startswith('/'):
99 path = self.join(self.cwd, path)
105 def dirname(self, path):
106 return '/'.join(path.split('/')[:-1])
108 def exists(self, *comps):
109 path = self.abspath(*comps)
110 return ((path in self.files and self.files[path] is not None) or
113 def files_under(self, top):
115 top = self.abspath(top)
117 if self.files[f] is not None and f.startswith(top):
118 files.append(self.relpath(f, top))
127 def getenv(self, key, default=None):
128 return self.env.get(key, default)
133 def isdir(self, *comps):
134 path = self.abspath(*comps)
135 return path in self.dirs
137 def isfile(self, *comps):
138 path = self.abspath(*comps)
139 return path in self.files and self.files[path] is not None
141 def join(self, *comps):
146 elif c.startswith('/'):
154 p = p.replace('/./', '/')
159 idx = comps.index('..')
160 comps = comps[:idx-1] + comps[idx+1:]
164 def maybe_mkdir(self, *comps):
165 path = self.abspath(self.join(*comps))
166 if path not in self.dirs:
169 def mktempfile(self, delete=True):
170 curno = self.current_tmpno
171 self.current_tmpno += 1
173 f.name = '__im_tmp/tmpfile_%u' % curno
176 def mkdtemp(self, suffix='', prefix='tmp', dir=None, **_kwargs):
178 dir = self.sep + '__im_tmp'
179 curno = self.current_tmpno
180 self.current_tmpno += 1
181 self.last_tmpdir = self.join(dir, '%s_%u_%s' % (prefix, curno, suffix))
182 self.dirs.add(self.last_tmpdir)
183 return self.last_tmpdir
185 def mtime(self, *comps):
186 return self.mtimes.get(self.join(*comps), 0)
188 def print_(self, msg='', end='\n', stream=None):
189 stream = stream or self.stdout
190 stream.write(msg + end)
193 def read_binary_file(self, *comps):
194 return self._read(comps)
196 def read_text_file(self, *comps):
197 return self._read(comps)
199 def _read(self, comps):
200 return self.files[self.abspath(*comps)]
202 def realpath(self, *comps):
203 return self.abspath(*comps)
205 def relpath(self, path, start):
206 return path.replace(start + '/', '')
208 def remove(self, *comps):
209 path = self.abspath(*comps)
210 self.files[path] = None
211 self.written_files[path] = None
213 def rmtree(self, *comps):
214 path = self.abspath(*comps)
216 if f.startswith(path):
218 self.written_files[f] = None
219 self.dirs.remove(path)
221 def terminal_width(self):
224 def splitext(self, path):
225 idx = path.rfind('.')
228 return (path[:idx], path[idx:])
233 def write_binary_file(self, path, contents):
234 self._write(path, contents)
236 def write_text_file(self, path, contents):
237 self._write(path, contents)
239 def _write(self, path, contents):
240 full_path = self.abspath(path)
241 self.maybe_mkdir(self.dirname(full_path))
242 self.files[full_path] = contents
243 self.written_files[full_path] = contents
245 def fetch(self, url, data=None, headers=None):
246 resp = self.fetch_responses.get(url, FakeResponse(unicode(''), url))
247 self.fetches.append((url, data, headers, resp))
250 def _tap_output(self):
251 self.stdout = _TeedStream(self.stdout)
252 self.stderr = _TeedStream(self.stderr)
254 sys.stdout = self.stdout
255 sys.stderr = self.stderr
257 def _untap_output(self):
258 assert isinstance(self.stdout, _TeedStream)
259 self.stdout = self.stdout.stream
260 self.stderr = self.stderr.stream
262 sys.stdout = self.stdout
263 sys.stderr = self.stderr
265 def capture_output(self, divert=True):
267 self._orig_logging_handlers = self.logger.handlers
268 if self._orig_logging_handlers:
269 self.logger.handlers = [logging.StreamHandler(self.stderr)]
270 self.stdout.capture(divert=divert)
271 self.stderr.capture(divert=divert)
273 def restore_output(self):
274 assert isinstance(self.stdout, _TeedStream)
275 out, err = (self.stdout.restore(), self.stderr.restore())
276 self.logger.handlers = self._orig_logging_handlers
281 class FakeResponse(io.StringIO):
283 def __init__(self, response, url, code=200):
284 io.StringIO.__init__(self, response)