Upstream version 11.40.271.0
[platform/framework/web/crosswalk.git] / src / third_party / typ / typ / fakes / host_fake.py
1 # Copyright 2014 Dirk Pranke. All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import copy
16 import io
17 import logging
18 import sys
19
20 from typ.host import _TeedStream
21
22
23 is_python3 = bool(sys.version_info.major == 3)
24
25 if is_python3:  # pragma: python3
26     # redefining built-in 'unicode' pylint: disable=W0622
27     unicode = str
28
29
30 class FakeHost(object):
31     # "too many instance attributes" pylint: disable=R0902
32     # "redefining built-in" pylint: disable=W0622
33     # "unused arg" pylint: disable=W0613
34
35     python_interpreter = 'python'
36     is_python3 = bool(sys.version_info.major == 3)
37
38     def __init__(self):
39         self.logger = logging.getLogger()
40         self.stdin = io.StringIO()
41         self.stdout = io.StringIO()
42         self.stderr = io.StringIO()
43         self.platform = 'linux2'
44         self.env = {}
45         self.sep = '/'
46         self.dirs = set([])
47         self.files = {}
48         self.fetches = []
49         self.fetch_responses = {}
50         self.written_files = {}
51         self.last_tmpdir = None
52         self.current_tmpno = 0
53         self.mtimes = {}
54         self.cmds = []
55         self.cwd = '/tmp'
56         self._orig_logging_handlers = []
57
58     def __getstate__(self):
59         d = copy.copy(self.__dict__)
60         del d['stderr']
61         del d['stdout']
62         del d['stdin']
63         del d['logger']
64         del d['_orig_logging_handlers']
65         return d
66
67     def __setstate__(self, d):
68         for k, v in d.items():
69             setattr(self, k, v)
70         self.logger = logging.getLogger()
71         self.stdin = io.StringIO()
72         self.stdout = io.StringIO()
73         self.stderr = io.StringIO()
74
75     def abspath(self, *comps):
76         relpath = self.join(*comps)
77         if relpath.startswith('/'):
78             return relpath
79         return self.join(self.cwd, relpath)
80
81     def add_to_path(self, *comps):
82         absolute_path = self.abspath(*comps)
83         if absolute_path not in sys.path:
84             sys.path.append(absolute_path)
85
86     def basename(self, path):
87         return path.split(self.sep)[-1]
88
89     def call(self, argv, stdin=None, env=None):
90         self.cmds.append(argv)
91         return 0, '', ''
92
93     def call_inline(self, argv):
94         return self.call(argv)[0]
95
96     def chdir(self, *comps):
97         path = self.join(*comps)
98         if not path.startswith('/'):
99             path = self.join(self.cwd, path)
100         self.cwd = path
101
102     def cpu_count(self):
103         return 1
104
105     def dirname(self, path):
106         return '/'.join(path.split('/')[:-1])
107
108     def exists(self, *comps):
109         path = self.abspath(*comps)
110         return ((path in self.files and self.files[path] is not None) or
111                 path in self.dirs)
112
113     def files_under(self, top):
114         files = []
115         top = self.abspath(top)
116         for f in self.files:
117             if self.files[f] is not None and f.startswith(top):
118                 files.append(self.relpath(f, top))
119         return files
120
121     def for_mp(self):
122         return self
123
124     def getcwd(self):
125         return self.cwd
126
127     def getenv(self, key, default=None):
128         return self.env.get(key, default)
129
130     def getpid(self):
131         return 1
132
133     def isdir(self, *comps):
134         path = self.abspath(*comps)
135         return path in self.dirs
136
137     def isfile(self, *comps):
138         path = self.abspath(*comps)
139         return path in self.files and self.files[path] is not None
140
141     def join(self, *comps):
142         p = ''
143         for c in comps:
144             if c in ('', '.'):
145                 continue
146             elif c.startswith('/'):
147                 p = c
148             elif p:
149                 p += '/' + c
150             else:
151                 p = c
152
153         # Handle ./
154         p = p.replace('/./', '/')
155
156         # Handle ../
157         while '/..' in p:
158             comps = p.split('/')
159             idx = comps.index('..')
160             comps = comps[:idx-1] + comps[idx+1:]
161             p = '/'.join(comps)
162         return p
163
164     def maybe_mkdir(self, *comps):
165         path = self.abspath(self.join(*comps))
166         if path not in self.dirs:
167             self.dirs.add(path)
168
169     def mktempfile(self, delete=True):
170         curno = self.current_tmpno
171         self.current_tmpno += 1
172         f = io.StringIO()
173         f.name = '__im_tmp/tmpfile_%u' % curno
174         return f
175
176     def mkdtemp(self, suffix='', prefix='tmp', dir=None, **_kwargs):
177         if dir is None:
178             dir = self.sep + '__im_tmp'
179         curno = self.current_tmpno
180         self.current_tmpno += 1
181         self.last_tmpdir = self.join(dir, '%s_%u_%s' % (prefix, curno, suffix))
182         self.dirs.add(self.last_tmpdir)
183         return self.last_tmpdir
184
185     def mtime(self, *comps):
186         return self.mtimes.get(self.join(*comps), 0)
187
188     def print_(self, msg='', end='\n', stream=None):
189         stream = stream or self.stdout
190         stream.write(msg + end)
191         stream.flush()
192
193     def read_binary_file(self, *comps):
194         return self._read(comps)
195
196     def read_text_file(self, *comps):
197         return self._read(comps)
198
199     def _read(self, comps):
200         return self.files[self.abspath(*comps)]
201
202     def realpath(self, *comps):
203         return self.abspath(*comps)
204
205     def relpath(self, path, start):
206         return path.replace(start + '/', '')
207
208     def remove(self, *comps):
209         path = self.abspath(*comps)
210         self.files[path] = None
211         self.written_files[path] = None
212
213     def rmtree(self, *comps):
214         path = self.abspath(*comps)
215         for f in self.files:
216             if f.startswith(path):
217                 self.files[f] = None
218                 self.written_files[f] = None
219         self.dirs.remove(path)
220
221     def terminal_width(self):
222         return 80
223
224     def splitext(self, path):
225         idx = path.rfind('.')
226         if idx == -1:
227             return (path, '')
228         return (path[:idx], path[idx:])
229
230     def time(self):
231         return 0
232
233     def write_binary_file(self, path, contents):
234         self._write(path, contents)
235
236     def write_text_file(self, path, contents):
237         self._write(path, contents)
238
239     def _write(self, path, contents):
240         full_path = self.abspath(path)
241         self.maybe_mkdir(self.dirname(full_path))
242         self.files[full_path] = contents
243         self.written_files[full_path] = contents
244
245     def fetch(self, url, data=None, headers=None):
246         resp = self.fetch_responses.get(url, FakeResponse(unicode(''), url))
247         self.fetches.append((url, data, headers, resp))
248         return resp
249
250     def _tap_output(self):
251         self.stdout = _TeedStream(self.stdout)
252         self.stderr = _TeedStream(self.stderr)
253         if True:
254             sys.stdout = self.stdout
255             sys.stderr = self.stderr
256
257     def _untap_output(self):
258         assert isinstance(self.stdout, _TeedStream)
259         self.stdout = self.stdout.stream
260         self.stderr = self.stderr.stream
261         if True:
262             sys.stdout = self.stdout
263             sys.stderr = self.stderr
264
265     def capture_output(self, divert=True):
266         self._tap_output()
267         self._orig_logging_handlers = self.logger.handlers
268         if self._orig_logging_handlers:
269             self.logger.handlers = [logging.StreamHandler(self.stderr)]
270         self.stdout.capture(divert=divert)
271         self.stderr.capture(divert=divert)
272
273     def restore_output(self):
274         assert isinstance(self.stdout, _TeedStream)
275         out, err = (self.stdout.restore(), self.stderr.restore())
276         self.logger.handlers = self._orig_logging_handlers
277         self._untap_output()
278         return out, err
279
280
281 class FakeResponse(io.StringIO):
282
283     def __init__(self, response, url, code=200):
284         io.StringIO.__init__(self, response)
285         self._url = url
286         self.code = code
287
288     def geturl(self):
289         return self._url
290
291     def getcode(self):
292         return self.code