2 # Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
3 # Use of this source code is governed by a BSD-style license that can be
4 # found in the LICENSE file.
6 """Unit tests for the table module."""
8 from __future__ import print_function
15 sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(
16 os.path.abspath(__file__)))))
17 from chromite.lib import cros_test_lib
18 from chromite.lib import osutils
19 from chromite.lib import table
21 # pylint: disable=W0212,R0904
22 class TableTest(cros_test_lib.TestCase):
23 """Unit tests for the Table class."""
29 COLUMNS = [COL0, COL1, COL2, COL3]
31 ROW0 = {COL0: 'Xyz', COL1: 'Bcd', COL2: 'Cde'}
32 ROW1 = {COL0: 'Abc', COL1: 'Bcd', COL2: 'Opq', COL3: 'Foo'}
33 ROW2 = {COL0: 'Abc', COL1: 'Nop', COL2: 'Wxy', COL3: 'Bar'}
35 EXTRAROW = {COL1: 'Walk', COL2: 'The', COL3: 'Line'}
37 ROW0a = {COL0: 'Xyz', COL1: 'Bcd', COL2: 'Cde', COL3: 'Yay'}
38 ROW0b = {COL0: 'Xyz', COL1: 'Bcd', COL2: 'Cde', COL3: 'Boo'}
39 ROW1a = {COL0: 'Abc', COL1: 'Bcd', COL2: 'Opq', COL3: 'Blu'}
42 EXTRACOLUMNS = [COL0, EXTRACOL, COL1, COL2]
44 EROW0 = {COL0: 'Xyz', EXTRACOL: 'Yay', COL1: 'Bcd', COL2: 'Cde'}
45 EROW1 = {COL0: 'Abc', EXTRACOL: 'Hip', COL1: 'Bcd', COL2: 'Opq'}
46 EROW2 = {COL0: 'Abc', EXTRACOL: 'Yay', COL1: 'Nop', COL2: 'Wxy'}
48 def _GetRowValsInOrder(self, row):
49 """Take |row| dict and return correctly ordered values in a list."""
51 for col in self.COLUMNS:
52 vals.append(row.get(col, ""))
56 def _GetFullRowFor(self, row, cols):
57 return dict((col, row.get(col, '')) for col in cols)
59 def assertRowsEqual(self, row1, row2):
60 # Determine column superset
61 cols = set(row1.keys() + row2.keys())
62 self.assertEquals(self._GetFullRowFor(row1, cols),
63 self._GetFullRowFor(row2, cols))
65 def assertRowListsEqual(self, rows1, rows2):
66 for (row1, row2) in zip(rows1, rows2):
67 self.assertRowsEqual(row1, row2)
70 self._table = self._CreateTableWithRows(self.COLUMNS,
71 [self.ROW0, self.ROW1, self.ROW2])
73 def _CreateTableWithRows(self, cols, rows):
74 mytable = table.Table(list(cols))
77 mytable.AppendRow(dict(row))
81 self.assertEquals(3, len(self._table))
83 def testGetNumRows(self):
84 self.assertEquals(3, self._table.GetNumRows())
86 def testGetNumColumns(self):
87 self.assertEquals(4, self._table.GetNumColumns())
89 def testGetColumns(self):
90 self.assertEquals(self.COLUMNS, self._table.GetColumns())
92 def testGetColumnIndex(self):
93 self.assertEquals(0, self._table.GetColumnIndex(self.COL0))
94 self.assertEquals(1, self._table.GetColumnIndex(self.COL1))
95 self.assertEquals(2, self._table.GetColumnIndex(self.COL2))
97 def testGetColumnByIndex(self):
98 self.assertEquals(self.COL0, self._table.GetColumnByIndex(0))
99 self.assertEquals(self.COL1, self._table.GetColumnByIndex(1))
100 self.assertEquals(self.COL2, self._table.GetColumnByIndex(2))
102 def testGetByIndex(self):
103 self.assertRowsEqual(self.ROW0, self._table.GetRowByIndex(0))
104 self.assertRowsEqual(self.ROW0, self._table[0])
106 self.assertRowsEqual(self.ROW2, self._table.GetRowByIndex(2))
107 self.assertRowsEqual(self.ROW2, self._table[2])
110 self.assertRowListsEqual([self.ROW0, self.ROW1], self._table[0:2])
111 self.assertRowListsEqual([self.ROW2], self._table[-1:])
113 def testGetByValue(self):
114 rows = self._table.GetRowsByValue({self.COL0: 'Abc'})
115 self.assertEquals([self.ROW1, self.ROW2], rows)
116 rows = self._table.GetRowsByValue({self.COL2: 'Opq'})
117 self.assertEquals([self.ROW1], rows)
118 rows = self._table.GetRowsByValue({self.COL3: 'Foo'})
119 self.assertEquals([self.ROW1], rows)
121 def testGetIndicesByValue(self):
122 indices = self._table.GetRowIndicesByValue({self.COL0: 'Abc'})
123 self.assertEquals([1, 2], indices)
124 indices = self._table.GetRowIndicesByValue({self.COL2: 'Opq'})
125 self.assertEquals([1], indices)
126 indices = self._table.GetRowIndicesByValue({self.COL3: 'Foo'})
127 self.assertEquals([1], indices)
129 def testAppendRowDict(self):
130 self._table.AppendRow(self.EXTRAROW)
131 self.assertEquals(4, self._table.GetNumRows())
132 self.assertEquals(self.EXTRAROW, self._table[len(self._table) - 1])
134 def testAppendRowList(self):
135 self._table.AppendRow(self._GetRowValsInOrder(self.EXTRAROW))
136 self.assertEquals(4, self._table.GetNumRows())
137 self.assertEquals(self.EXTRAROW, self._table[len(self._table) - 1])
139 def testSetRowDictByIndex(self):
140 self._table.SetRowByIndex(1, self.EXTRAROW)
141 self.assertEquals(3, self._table.GetNumRows())
142 self.assertEquals(self.EXTRAROW, self._table[1])
144 def testSetRowListByIndex(self):
145 self._table.SetRowByIndex(1, self._GetRowValsInOrder(self.EXTRAROW))
146 self.assertEquals(3, self._table.GetNumRows())
147 self.assertEquals(self.EXTRAROW, self._table[1])
149 def testRemoveRowByIndex(self):
150 self._table.RemoveRowByIndex(1)
151 self.assertEquals(2, self._table.GetNumRows())
152 self.assertEquals(self.ROW2, self._table[1])
154 def testRemoveRowBySlice(self):
156 self.assertEquals(1, self._table.GetNumRows())
157 self.assertEquals(self.ROW2, self._table[0])
159 def testIteration(self):
161 for row in self._table:
162 self.assertEquals(row, self._table[ix])
167 self.assertEquals(0, len(self._table))
169 def testMergeRows(self):
170 # This merge should fail without a merge rule. Capture stderr to avoid
171 # scary error message in test output.
173 sys.stderr = cStringIO.StringIO()
174 self.assertRaises(ValueError, self._table._MergeRow, self.ROW0a, self.COL0)
177 # Merge but stick with current row where different.
178 self._table._MergeRow(self.ROW0a, self.COL0,
179 merge_rules = { self.COL3: 'accept_this_val' })
180 self.assertEquals(3, len(self._table))
181 self.assertRowsEqual(self.ROW0, self._table[0])
183 # Merge and use new row where different.
184 self._table._MergeRow(self.ROW0a, self.COL0,
185 merge_rules = { self.COL3: 'accept_other_val' })
186 self.assertEquals(3, len(self._table))
187 self.assertRowsEqual(self.ROW0a, self._table[0])
189 # Merge and combine column values where different
190 self._table._MergeRow(self.ROW1a, self.COL2,
191 merge_rules = { self.COL3: 'join_with: ' })
192 self.assertEquals(3, len(self._table))
193 final_row = dict(self.ROW1a)
194 final_row[self.COL3] = self.ROW1[self.COL3] + ' ' + self.ROW1a[self.COL3]
195 self.assertRowsEqual(final_row, self._table[1])
197 def testMergeTablesSameCols(self):
198 other_table = self._CreateTableWithRows(self.COLUMNS,
199 [self.ROW0b, self.ROW1a, self.ROW2])
201 self._table.MergeTable(other_table, self.COL2,
202 merge_rules = { self.COL3: 'join_with: ' })
204 final_row0 = self.ROW0b
205 final_row1 = dict(self.ROW1a)
206 final_row1[self.COL3] = self.ROW1[self.COL3] + ' ' + self.ROW1a[self.COL3]
207 final_row2 = self.ROW2
208 self.assertRowsEqual(final_row0, self._table[0])
209 self.assertRowsEqual(final_row1, self._table[1])
210 self.assertRowsEqual(final_row2, self._table[2])
212 def testMergeTablesNewCols(self):
213 self.assertFalse(self._table.HasColumn(self.EXTRACOL))
215 other_rows = [self.EROW0, self.EROW1, self.EROW2]
216 other_table = self._CreateTableWithRows(self.EXTRACOLUMNS, other_rows)
218 self._table.MergeTable(other_table, self.COL2,
219 allow_new_columns=True,
220 merge_rules = { self.COL3: 'join_by_space' })
222 self.assertTrue(self._table.HasColumn(self.EXTRACOL))
223 self.assertEquals(5, self._table.GetNumColumns())
224 self.assertEquals(1, self._table.GetColumnIndex(self.EXTRACOL))
226 final_row0 = dict(self.ROW0)
227 final_row0[self.EXTRACOL] = self.EROW0[self.EXTRACOL]
228 final_row1 = dict(self.ROW1)
229 final_row1[self.EXTRACOL] = self.EROW1[self.EXTRACOL]
230 final_row2 = dict(self.ROW2)
231 final_row2[self.EXTRACOL] = self.EROW2[self.EXTRACOL]
232 self.assertRowsEqual(final_row0, self._table[0])
233 self.assertRowsEqual(final_row1, self._table[1])
234 self.assertRowsEqual(final_row2, self._table[2])
237 self.assertRowsEqual(self.ROW0, self._table[0])
238 self.assertRowsEqual(self.ROW1, self._table[1])
239 self.assertRowsEqual(self.ROW2, self._table[2])
242 self._table.Sort(lambda row : row[self.COL3])
244 self.assertEquals(3, len(self._table))
245 self.assertRowsEqual(self.ROW0, self._table[0])
246 self.assertRowsEqual(self.ROW2, self._table[1])
247 self.assertRowsEqual(self.ROW1, self._table[2])
249 # Reverse sort by COL3
250 self._table.Sort(lambda row : row[self.COL3], reverse=True)
252 self.assertEquals(3, len(self._table))
253 self.assertRowsEqual(self.ROW1, self._table[0])
254 self.assertRowsEqual(self.ROW2, self._table[1])
255 self.assertRowsEqual(self.ROW0, self._table[2])
258 """Test multiple key sort."""
259 self.assertRowsEqual(self.ROW0, self._table[0])
260 self.assertRowsEqual(self.ROW1, self._table[1])
261 self.assertRowsEqual(self.ROW2, self._table[2])
263 # Sort by COL0 then COL1
265 return (row[self.COL0], row[self.COL1])
266 self._table.Sort(sorter)
268 self.assertEquals(3, len(self._table))
269 self.assertRowsEqual(self.ROW1, self._table[0])
270 self.assertRowsEqual(self.ROW2, self._table[1])
271 self.assertRowsEqual(self.ROW0, self._table[2])
274 self._table.Sort(sorter, reverse=True)
276 self.assertEquals(3, len(self._table))
277 self.assertRowsEqual(self.ROW0, self._table[0])
278 self.assertRowsEqual(self.ROW2, self._table[1])
279 self.assertRowsEqual(self.ROW1, self._table[2])
281 def testSplitCSVLine(self):
282 """Test splitting of csv line."""
283 tests = {'a,b,c,d': ['a', 'b', 'c', 'd'],
284 'a, b, c, d': ['a', ' b', ' c', ' d'],
285 'a,b,c,': ['a', 'b', 'c', ''],
286 'a,"b c",d': ['a', 'b c', 'd'],
287 'a,"b, c",d': ['a', 'b, c', 'd'],
288 'a,"b, c, d",e': ['a', 'b, c, d', 'e'],
289 'a,"""b, c""",d': ['a', '"b, c"', 'd'],
290 'a,"""b, c"", d",e': ['a', '"b, c", d', 'e'],
292 # Following not real Google Spreadsheet cases.
293 'a,b\,c,d': ['a', 'b,c', 'd'],
294 'a,",c': ['a', '",c'],
295 'a,"",c': ['a', '', 'c'],
298 vals = table.Table._SplitCSVLine(line)
299 self.assertEquals(vals, tests[line])
301 @osutils.TempDirDecorator
302 def testWriteReadCSV(self):
303 """Write and Read CSV and verify contents preserved."""
304 # This also tests the Table == and != operators.
305 _, path = tempfile.mkstemp(text=True)
306 tmpfile = open(path, 'w')
307 self._table.WriteCSV(tmpfile)
309 mytable = table.Table.LoadFromCSV(path)
310 self.assertEquals(mytable, self._table)
311 self.assertFalse(mytable != self._table)
313 def testInsertColumn(self):
314 self._table.InsertColumn(1, self.EXTRACOL, 'blah')
315 goldenrow = dict(self.ROW1)
316 goldenrow[self.EXTRACOL] = 'blah'
317 self.assertRowsEqual(goldenrow, self._table.GetRowByIndex(1))
318 self.assertEquals(self.EXTRACOL, self._table.GetColumnByIndex(1))
320 def testAppendColumn(self):
321 self._table.AppendColumn(self.EXTRACOL, 'blah')
322 goldenrow = dict(self.ROW1)
323 goldenrow[self.EXTRACOL] = 'blah'
324 self.assertRowsEqual(goldenrow, self._table.GetRowByIndex(1))
325 col_size = self._table.GetNumColumns()
326 self.assertEquals(self.EXTRACOL, self._table.GetColumnByIndex(col_size - 1))
328 def testProcessRows(self):
330 row[self.COL0] = row[self.COL0] + " processed"
331 self._table.ProcessRows(Processor)
333 final_row0 = dict(self.ROW0)
334 final_row0[self.COL0] += " processed"
335 final_row1 = dict(self.ROW1)
336 final_row1[self.COL0] += " processed"
337 final_row2 = dict(self.ROW2)
338 final_row2[self.COL0] += " processed"
339 self.assertRowsEqual(final_row0, self._table[0])
340 self.assertRowsEqual(final_row1, self._table[1])
341 self.assertRowsEqual(final_row2, self._table[2])
343 if __name__ == "__main__":