Imported Upstream version 3.7.3
[platform/upstream/python-gobject.git] / tests / test_iochannel.py
1 # -*- Mode: Python -*-
2 # encoding: UTF-8
3 from __future__ import unicode_literals
4
5 import unittest
6 import tempfile
7 import os.path
8 import fcntl
9 import shutil
10 import warnings
11
12 from gi.repository import GLib
13 from gi import PyGIDeprecationWarning
14
15 from compathelper import _unicode
16
17
18 class IOChannel(unittest.TestCase):
19     def setUp(self):
20         self.workdir = tempfile.mkdtemp()
21
22         self.testutf8 = os.path.join(self.workdir, 'testutf8.txt')
23         with open(self.testutf8, 'wb') as f:
24             f.write('''hello ♥ world
25 second line
26
27 À demain!'''.encode('UTF-8'))
28
29         self.testlatin1 = os.path.join(self.workdir, 'testlatin1.txt')
30         with open(self.testlatin1, 'wb') as f:
31             f.write(b'''hell\xf8 world
32 second line
33
34 \xc0 demain!''')
35
36         self.testout = os.path.join(self.workdir, 'testout.txt')
37
38     def tearDown(self):
39         shutil.rmtree(self.workdir)
40
41     def test_file_readline_utf8(self):
42         ch = GLib.IOChannel(filename=self.testutf8)
43         self.assertEqual(ch.get_encoding(), 'UTF-8')
44         self.assertTrue(ch.get_close_on_unref())
45         self.assertEqual(_unicode(ch.readline()), 'hello ♥ world\n')
46         self.assertEqual(ch.get_buffer_condition(), GLib.IOCondition.IN)
47         self.assertEqual(ch.readline(), 'second line\n')
48         self.assertEqual(ch.readline(), '\n')
49         self.assertEqual(_unicode(ch.readline()), 'À demain!')
50         self.assertEqual(ch.get_buffer_condition(), 0)
51         self.assertEqual(ch.readline(), '')
52         ch.close()
53
54     def test_file_readline_latin1(self):
55         ch = GLib.IOChannel(filename=self.testlatin1, mode='r')
56         ch.set_encoding('latin1')
57         self.assertEqual(ch.get_encoding(), 'latin1')
58         self.assertEqual(_unicode(ch.readline()), 'hellø world\n')
59         self.assertEqual(ch.readline(), 'second line\n')
60         self.assertEqual(ch.readline(), '\n')
61         self.assertEqual(_unicode(ch.readline()), 'À demain!')
62         ch.close()
63
64     def test_file_iter(self):
65         items = []
66         ch = GLib.IOChannel(filename=self.testutf8)
67         for item in ch:
68             items.append(item)
69         self.assertEqual(len(items), 4)
70         self.assertEqual(_unicode(items[0]), 'hello ♥ world\n')
71         ch.close()
72
73     def test_file_readlines(self):
74         ch = GLib.IOChannel(filename=self.testutf8)
75         lines = ch.readlines()
76         # Note, this really ought to be 4, but the static bindings add an extra
77         # empty one
78         self.assertGreaterEqual(len(lines), 4)
79         self.assertLessEqual(len(lines), 5)
80         self.assertEqual(_unicode(lines[0]), 'hello ♥ world\n')
81         self.assertEqual(_unicode(lines[3]), 'À demain!')
82         if len(lines) == 4:
83             self.assertEqual(lines[4], '')
84
85     def test_file_read(self):
86         ch = GLib.IOChannel(filename=self.testutf8)
87         with open(self.testutf8, 'rb') as f:
88             self.assertEqual(ch.read(), f.read())
89
90         ch = GLib.IOChannel(filename=self.testutf8)
91         with open(self.testutf8, 'rb') as f:
92             self.assertEqual(ch.read(10), f.read(10))
93
94         ch = GLib.IOChannel(filename=self.testutf8)
95         with open(self.testutf8, 'rb') as f:
96             self.assertEqual(ch.read(max_count=15), f.read(15))
97
98     def test_seek(self):
99         ch = GLib.IOChannel(filename=self.testutf8)
100         ch.seek(2)
101         self.assertEqual(ch.read(3), b'llo')
102
103         ch.seek(2, 0)  # SEEK_SET
104         self.assertEqual(ch.read(3), b'llo')
105
106         ch.seek(1, 1)  # SEEK_CUR, skip the space
107         self.assertEqual(ch.read(3), b'\xe2\x99\xa5')
108
109         ch.seek(2, 2)  # SEEK_END
110         # FIXME: does not work currently
111         #self.assertEqual(ch.read(2), b'n!')
112
113         # invalid whence value
114         self.assertRaises(ValueError, ch.seek, 0, 3)
115
116     def test_file_write(self):
117         ch = GLib.IOChannel(filename=self.testout, mode='w')
118         ch.set_encoding('latin1')
119         ch.write('hellø world\n')
120         ch.close()
121         ch = GLib.IOChannel(filename=self.testout, mode='a')
122         ch.set_encoding('latin1')
123         ch.write('À demain!')
124         ch.close()
125
126         with open(self.testout, 'rb') as f:
127             self.assertEqual(f.read().decode('latin1'), 'hellø world\nÀ demain!')
128
129     def test_file_writelines(self):
130         ch = GLib.IOChannel(filename=self.testout, mode='w')
131         ch.writelines(['foo', 'bar\n', 'baz\n', 'end'])
132         ch.close()
133
134         with open(self.testout, 'r') as f:
135             self.assertEqual(f.read(), 'foobar\nbaz\nend')
136
137     def test_buffering(self):
138         writer = GLib.IOChannel(filename=self.testout, mode='w')
139         writer.set_encoding(None)
140         self.assertTrue(writer.get_buffered())
141         self.assertGreater(writer.get_buffer_size(), 10)
142
143         reader = GLib.IOChannel(filename=self.testout, mode='r')
144
145         # does not get written immediately on buffering
146         writer.write('abc')
147         self.assertEqual(reader.read(), b'')
148         writer.flush()
149         self.assertEqual(reader.read(), b'abc')
150
151         # does get written immediately without buffering
152         writer.set_buffered(False)
153         writer.write('def')
154         self.assertEqual(reader.read(), b'def')
155
156         # writes after buffer overflow
157         writer.set_buffer_size(10)
158         writer.write('0123456789012')
159         self.assertTrue(reader.read().startswith(b'012'))
160         writer.flush()
161         reader.read()  # ignore bits written after flushing
162
163         # closing flushes
164         writer.set_buffered(True)
165         writer.write('ghi')
166         writer.close()
167         self.assertEqual(reader.read(), b'ghi')
168         reader.close()
169
170     def test_fd_read(self):
171         (r, w) = os.pipe()
172
173         ch = GLib.IOChannel(filedes=r)
174         ch.set_encoding(None)
175         ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
176         self.assertNotEqual(ch.get_flags() | GLib.IOFlags.NONBLOCK, 0)
177         self.assertEqual(ch.read(), b'')
178         os.write(w, b'\x01\x02')
179         self.assertEqual(ch.read(), b'\x01\x02')
180
181         # now test blocking case, after closing the write end
182         ch.set_flags(GLib.IOFlags(ch.get_flags() & ~GLib.IOFlags.NONBLOCK))
183         os.write(w, b'\x03\x04')
184         os.close(w)
185         self.assertEqual(ch.read(), b'\x03\x04')
186
187         ch.close()
188
189     def test_fd_write(self):
190         (r, w) = os.pipe()
191         fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) | os.O_NONBLOCK)
192
193         ch = GLib.IOChannel(filedes=w, mode='w')
194         ch.set_encoding(None)
195         ch.set_buffered(False)
196         ch.write(b'\x01\x02')
197         self.assertEqual(os.read(r, 10), b'\x01\x02')
198
199         # now test blocking case, after closing the write end
200         fcntl.fcntl(r, fcntl.F_SETFL, fcntl.fcntl(r, fcntl.F_GETFL) & ~os.O_NONBLOCK)
201         ch.write(b'\x03\x04')
202         ch.close()
203         self.assertEqual(os.read(r, 10), b'\x03\x04')
204         os.close(r)
205
206     def test_deprecated_method_add_watch_no_data(self):
207         (r, w) = os.pipe()
208
209         ch = GLib.IOChannel(filedes=r)
210         ch.set_encoding(None)
211         ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
212
213         cb_reads = []
214
215         def cb(channel, condition):
216             self.assertEqual(channel, ch)
217             self.assertEqual(condition, GLib.IOCondition.IN)
218             cb_reads.append(channel.read())
219             return True
220
221         # io_add_watch() method is deprecated, use GLib.io_add_watch
222         with warnings.catch_warnings(record=True) as warn:
223             warnings.simplefilter('always')
224             ch.add_watch(GLib.IOCondition.IN, cb)
225             self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
226
227         ml = GLib.MainLoop()
228
229         GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
230         GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
231         GLib.timeout_add(200, ml.quit)
232         ml.run()
233
234         self.assertEqual(cb_reads, [b'a', b'b'])
235
236     def test_deprecated_method_add_watch_data_priority(self):
237         (r, w) = os.pipe()
238
239         ch = GLib.IOChannel(filedes=r)
240         ch.set_encoding(None)
241         ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
242
243         cb_reads = []
244
245         def cb(channel, condition, data):
246             self.assertEqual(channel, ch)
247             self.assertEqual(condition, GLib.IOCondition.IN)
248             self.assertEqual(data, 'hello')
249             cb_reads.append(channel.read())
250             return True
251
252         ml = GLib.MainLoop()
253         # io_add_watch() method is deprecated, use GLib.io_add_watch
254         with warnings.catch_warnings(record=True) as warn:
255             warnings.simplefilter('always')
256             id = ch.add_watch(GLib.IOCondition.IN, cb, 'hello', priority=GLib.PRIORITY_HIGH)
257             self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
258
259         self.assertEqual(ml.get_context().find_source_by_id(id).priority,
260                          GLib.PRIORITY_HIGH)
261
262         GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
263         GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
264         GLib.timeout_add(200, ml.quit)
265         ml.run()
266
267         self.assertEqual(cb_reads, [b'a', b'b'])
268
269     def test_add_watch_no_data(self):
270         (r, w) = os.pipe()
271
272         ch = GLib.IOChannel(filedes=r)
273         ch.set_encoding(None)
274         ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
275
276         cb_reads = []
277
278         def cb(channel, condition):
279             self.assertEqual(channel, ch)
280             self.assertEqual(condition, GLib.IOCondition.IN)
281             cb_reads.append(channel.read())
282             return True
283
284         id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb)
285
286         ml = GLib.MainLoop()
287         self.assertEqual(ml.get_context().find_source_by_id(id).priority,
288                          GLib.PRIORITY_HIGH)
289         GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
290         GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
291         GLib.timeout_add(200, ml.quit)
292         ml.run()
293
294         self.assertEqual(cb_reads, [b'a', b'b'])
295
296     def test_add_watch_with_data(self):
297         (r, w) = os.pipe()
298
299         ch = GLib.IOChannel(filedes=r)
300         ch.set_encoding(None)
301         ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
302
303         cb_reads = []
304
305         def cb(channel, condition, data):
306             self.assertEqual(channel, ch)
307             self.assertEqual(condition, GLib.IOCondition.IN)
308             self.assertEqual(data, 'hello')
309             cb_reads.append(channel.read())
310             return True
311
312         id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb, 'hello')
313
314         ml = GLib.MainLoop()
315         self.assertEqual(ml.get_context().find_source_by_id(id).priority,
316                          GLib.PRIORITY_HIGH)
317         GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
318         GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
319         GLib.timeout_add(200, ml.quit)
320         ml.run()
321
322         self.assertEqual(cb_reads, [b'a', b'b'])
323
324     def test_add_watch_with_multi_data(self):
325         (r, w) = os.pipe()
326
327         ch = GLib.IOChannel(filedes=r)
328         ch.set_encoding(None)
329         ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
330
331         cb_reads = []
332
333         def cb(channel, condition, data1, data2, data3):
334             self.assertEqual(channel, ch)
335             self.assertEqual(condition, GLib.IOCondition.IN)
336             self.assertEqual(data1, 'a')
337             self.assertEqual(data2, 'b')
338             self.assertEqual(data3, 'c')
339             cb_reads.append(channel.read())
340             return True
341
342         id = GLib.io_add_watch(ch, GLib.PRIORITY_HIGH, GLib.IOCondition.IN, cb,
343                                'a', 'b', 'c')
344
345         ml = GLib.MainLoop()
346         self.assertEqual(ml.get_context().find_source_by_id(id).priority,
347                          GLib.PRIORITY_HIGH)
348         GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
349         GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
350         GLib.timeout_add(200, ml.quit)
351         ml.run()
352
353         self.assertEqual(cb_reads, [b'a', b'b'])
354
355     def test_deprecated_add_watch_no_data(self):
356         (r, w) = os.pipe()
357
358         ch = GLib.IOChannel(filedes=r)
359         ch.set_encoding(None)
360         ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
361
362         cb_reads = []
363
364         def cb(channel, condition):
365             self.assertEqual(channel, ch)
366             self.assertEqual(condition, GLib.IOCondition.IN)
367             cb_reads.append(channel.read())
368             return True
369
370         with warnings.catch_warnings(record=True) as warn:
371             warnings.simplefilter('always')
372             id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, priority=GLib.PRIORITY_HIGH)
373             self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
374
375         ml = GLib.MainLoop()
376         self.assertEqual(ml.get_context().find_source_by_id(id).priority,
377                          GLib.PRIORITY_HIGH)
378         GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
379         GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
380         GLib.timeout_add(200, ml.quit)
381         ml.run()
382
383         self.assertEqual(cb_reads, [b'a', b'b'])
384
385     def test_deprecated_add_watch_with_data(self):
386         (r, w) = os.pipe()
387
388         ch = GLib.IOChannel(filedes=r)
389         ch.set_encoding(None)
390         ch.set_flags(ch.get_flags() | GLib.IOFlags.NONBLOCK)
391
392         cb_reads = []
393
394         def cb(channel, condition, data):
395             self.assertEqual(channel, ch)
396             self.assertEqual(condition, GLib.IOCondition.IN)
397             self.assertEqual(data, 'hello')
398             cb_reads.append(channel.read())
399             return True
400
401         with warnings.catch_warnings(record=True) as warn:
402             warnings.simplefilter('always')
403             id = GLib.io_add_watch(ch, GLib.IOCondition.IN, cb, 'hello',
404                                    priority=GLib.PRIORITY_HIGH)
405             self.assertTrue(issubclass(warn[0].category, PyGIDeprecationWarning))
406
407         ml = GLib.MainLoop()
408         self.assertEqual(ml.get_context().find_source_by_id(id).priority,
409                          GLib.PRIORITY_HIGH)
410         GLib.timeout_add(10, lambda: os.write(w, b'a') and False)
411         GLib.timeout_add(100, lambda: os.write(w, b'b') and False)
412         GLib.timeout_add(200, ml.quit)
413         ml.run()
414
415         self.assertEqual(cb_reads, [b'a', b'b'])
416
417     def test_backwards_compat_flags(self):
418         self.assertEqual(GLib.IOCondition.IN, GLib.IO_IN)
419         self.assertEqual(GLib.IOFlags.NONBLOCK, GLib.IO_FLAG_NONBLOCK)
420         self.assertEqual(GLib.IOFlags.IS_SEEKABLE, GLib.IO_FLAG_IS_SEEKABLE)
421         self.assertEqual(GLib.IOStatus.NORMAL, GLib.IO_STATUS_NORMAL)
422
423 if __name__ == '__main__':
424     unittest.main()