3 # Copyright (c) 2013 Intel, Inc.
5 # This program is free software; you can redistribute it and/or modify it
6 # under the terms of the GNU General Public License as published by the Free
7 # Software Foundation; version 2 of the License
9 # This program is distributed in the hope that it will be useful, but
10 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 # You should have received a copy of the GNU General Public License along
15 # with this program; if not, write to the Free Software Foundation, Inc., 59
16 # Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 """ This module implements a simple GPT partitions parser which can read the
19 GPT header and the GPT partition table. """
24 from mic.utils.errors import MountError
26 _GPT_HEADER_FORMAT = "<8s4sIIIQQQQ16sQIII"
27 _GPT_HEADER_SIZE = struct.calcsize(_GPT_HEADER_FORMAT)
28 _GPT_ENTRY_FORMAT = "<16s16sQQQ72s"
29 _GPT_ENTRY_SIZE = struct.calcsize(_GPT_ENTRY_FORMAT)
30 _SUPPORTED_GPT_REVISION = '\x00\x00\x01\x00'
32 def _stringify_uuid(binary_uuid):
33 """ A small helper function to transform a binary UUID into a string
36 uuid_str = str(uuid.UUID(bytes_le = binary_uuid))
38 return uuid_str.upper()
40 def _calc_header_crc(raw_hdr):
41 """ Calculate GPT header CRC32 checksum. The 'raw_hdr' parameter has to
42 be a list or a tuple containing all the elements of the GPT header in a
43 "raw" form, meaning that it should simply contain "unpacked" disk data.
46 raw_hdr = list(raw_hdr)
48 raw_hdr = struct.pack(_GPT_HEADER_FORMAT, *raw_hdr)
50 return binascii.crc32(raw_hdr) & 0xFFFFFFFF
52 def _validate_header(raw_hdr):
53 """ Validate the GPT header. The 'raw_hdr' parameter has to be a list or a
54 tuple containing all the elements of the GPT header in a "raw" form,
55 meaning that it should simply contain "unpacked" disk data. """
57 # Validate the signature
58 if raw_hdr[0] != 'EFI PART':
59 raise MountError("GPT partition table not found")
61 # Validate the revision
62 if raw_hdr[1] != _SUPPORTED_GPT_REVISION:
63 raise MountError("Unsupported GPT revision '%s', supported revision " \
65 (binascii.hexlify(raw_hdr[1]),
66 binascii.hexlify(_SUPPORTED_GPT_REVISION)))
68 # Validate header size
69 if raw_hdr[2] != _GPT_HEADER_SIZE:
70 raise MountError("Bad GPT header size: %d bytes, expected %d" % \
71 (raw_hdr[2], _GPT_HEADER_SIZE))
73 crc = _calc_header_crc(raw_hdr)
75 raise MountError("GPT header crc mismatch: %#x, should be %#x" % \
79 """ GPT partition table parser. Allows reading the GPT header and the
80 partition table, as well as modifying the partition table records. """
82 def __init__(self, disk_path, sector_size = 512):
83 """ The class constructor which accepts the following parameters:
84 * disk_path - full path to the disk image or device node
85 * sector_size - size of a disk sector in bytes """
87 self.sector_size = sector_size
88 self.disk_path = disk_path
91 self._disk_obj = open(disk_path, 'r+b')
92 except IOError as err:
93 raise MountError("Cannot open file '%s' for reading GPT " \
94 "partitions: %s" % (disk_path, err))
97 """ The class destructor. """
99 self._disk_obj.close()
101 def _read_disk(self, offset, size):
102 """ A helper function which reads 'size' bytes from offset 'offset' of
103 the disk and checks all the error conditions. """
105 self._disk_obj.seek(offset)
107 data = self._disk_obj.read(size)
108 except IOError as err:
109 raise MountError("cannot read from '%s': %s" % \
110 (self.disk_path, err))
112 if len(data) != size:
113 raise MountError("cannot read %d bytes from offset '%d' of '%s', " \
114 "read only %d bytes" % \
115 (size, offset, self.disk_path, len(data)))
119 def _write_disk(self, offset, buf):
120 """ A helper function which writes buffer 'buf' to offset 'offset' of
121 the disk. This function takes care of unaligned writes and checks all
122 the error conditions. """
124 # Since we may be dealing with a block device, we only can write in
125 # 'self.sector_size' chunks. Find the aligned starting and ending
126 # disk offsets to read.
127 start = (offset / self.sector_size) * self.sector_size
128 end = ((start + len(buf)) / self.sector_size + 1) * self.sector_size
130 data = self._read_disk(start, end - start)
132 data = data[:off] + buf + data[off + len(buf):]
134 self._disk_obj.seek(start)
136 self._disk_obj.write(data)
137 except IOError as err:
138 raise MountError("cannot write to '%s': %s" % (self.disk_path, err))
140 def read_header(self, primary = True):
141 """ Read and verify the GPT header and return a dictionary containing
142 the following elements:
144 'signature' : header signature
145 'revision' : header revision
146 'hdr_size' : header size in bytes
147 'hdr_crc' : header CRC32
148 'hdr_lba' : LBA of this header
149 'hdr_offs' : byte disk offset of this header
150 'backup_lba' : backup header LBA
151 'backup_offs' : byte disk offset of backup header
152 'first_lba' : first usable LBA for partitions
153 'first_offs' : first usable byte disk offset for partitions
154 'last_lba' : last usable LBA for partitions
155 'last_offs' : last usable byte disk offset for partitions
156 'disk_uuid' : UUID of the disk
157 'ptable_lba' : starting LBA of array of partition entries
158 'ptable_offs' : disk byte offset of the start of the partition table
159 'ptable_size' : partition table size in bytes
160 'entries_cnt' : number of available partition table entries
161 'entry_size' : size of a single partition entry
162 'ptable_crc' : CRC32 of the partition table
163 'primary' : a boolean, if 'True', this is the primary GPT header,
164 if 'False' - the secondary
165 'primary_str' : contains string "primary" if this is the primary GPT
166 header, and "backup" otherwise
168 This dictionary corresponds to the GPT header format. Please, see the
169 UEFI standard for the description of these fields.
171 If the 'primary' parameter is 'True', the primary GPT header is read,
172 otherwise the backup GPT header is read instead. """
174 # Read and validate the primary GPT header
175 raw_hdr = self._read_disk(self.sector_size, _GPT_HEADER_SIZE)
176 raw_hdr = struct.unpack(_GPT_HEADER_FORMAT, raw_hdr)
177 _validate_header(raw_hdr)
178 primary_str = "primary"
181 # Read and validate the backup GPT header
182 raw_hdr = self._read_disk(raw_hdr[6] * self.sector_size, _GPT_HEADER_SIZE)
183 raw_hdr = struct.unpack(_GPT_HEADER_FORMAT, raw_hdr)
184 _validate_header(raw_hdr)
185 primary_str = "backup"
187 return { 'signature' : raw_hdr[0],
188 'revision' : raw_hdr[1],
189 'hdr_size' : raw_hdr[2],
190 'hdr_crc' : raw_hdr[3],
191 'hdr_lba' : raw_hdr[5],
192 'hdr_offs' : raw_hdr[5] * self.sector_size,
193 'backup_lba' : raw_hdr[6],
194 'backup_offs' : raw_hdr[6] * self.sector_size,
195 'first_lba' : raw_hdr[7],
196 'first_offs' : raw_hdr[7] * self.sector_size,
197 'last_lba' : raw_hdr[8],
198 'last_offs' : raw_hdr[8] * self.sector_size,
199 'disk_uuid' :_stringify_uuid(raw_hdr[9]),
200 'ptable_lba' : raw_hdr[10],
201 'ptable_offs' : raw_hdr[10] * self.sector_size,
202 'ptable_size' : raw_hdr[11] * raw_hdr[12],
203 'entries_cnt' : raw_hdr[11],
204 'entry_size' : raw_hdr[12],
205 'ptable_crc' : raw_hdr[13],
207 'primary_str' : primary_str }
209 def _read_raw_ptable(self, header):
210 """ Read and validate primary or backup partition table. The 'header'
211 argument is the GPT header. If it is the primary GPT header, then the
212 primary partition table is read and validated, otherwise - the backup
213 one. The 'header' argument is a dictionary which is returned by the
214 'read_header()' method. """
216 raw_ptable = self._read_disk(header['ptable_offs'],
217 header['ptable_size'])
219 crc = binascii.crc32(raw_ptable) & 0xFFFFFFFF
220 if crc != header['ptable_crc']:
221 raise MountError("Partition table at LBA %d (%s) is corrupted" % \
222 (header['ptable_lba'], header['primary_str']))
226 def get_partitions(self, primary = True):
227 """ This is a generator which parses the GPT partition table and
228 generates the following dictionary for each partition:
230 'index' : the index of the partition table endry
231 'offs' : byte disk offset of the partition table entry
232 'type_uuid' : partition type UUID
233 'part_uuid' : partition UUID
234 'first_lba' : the first LBA
235 'last_lba' : the last LBA
236 'flags' : attribute flags
237 'name' : partition name
238 'primary' : a boolean, if 'True', this is the primary partition
239 table, if 'False' - the secondary
240 'primary_str' : contains string "primary" if this is the primary GPT
241 header, and "backup" otherwise
243 This dictionary corresponds to the GPT header format. Please, see the
244 UEFI standard for the description of these fields.
246 If the 'primary' parameter is 'True', partitions from the primary GPT
247 partition table are generated, otherwise partitions from the backup GPT
248 partition table are generated. """
251 primary_str = "primary"
253 primary_str = "backup"
255 header = self.read_header(primary)
256 raw_ptable = self._read_raw_ptable(header)
258 for index in xrange(0, header['entries_cnt']):
259 start = header['entry_size'] * index
260 end = start + header['entry_size']
261 raw_entry = struct.unpack(_GPT_ENTRY_FORMAT, raw_ptable[start:end])
263 if raw_entry[2] == 0 or raw_entry[3] == 0:
266 part_name = str(raw_entry[5].decode('UTF-16').split('\0', 1)[0])
268 yield { 'index' : index,
269 'offs' : header['ptable_offs'] + start,
270 'type_uuid' : _stringify_uuid(raw_entry[0]),
271 'part_uuid' : _stringify_uuid(raw_entry[1]),
272 'first_lba' : raw_entry[2],
273 'last_lba' : raw_entry[3],
274 'flags' : raw_entry[4],
277 'primary_str' : primary_str }
279 def _change_partition(self, header, entry):
280 """ A helper function for 'change_partitions()' which changes a
281 a paricular instance of the partition table (primary or backup). """
283 if entry['index'] >= header['entries_cnt']:
284 raise MountError("Partition table at LBA %d has only %d " \
285 "records cannot change record number " % \
286 (header['entries_cnt'], entry['index']))
287 # Read raw GPT header
288 raw_hdr = self._read_disk(header['hdr_offs'], _GPT_HEADER_SIZE)
289 raw_hdr = list(struct.unpack(_GPT_HEADER_FORMAT, raw_hdr))
290 _validate_header(raw_hdr)
292 # Prepare the new partition table entry
293 raw_entry = struct.pack(_GPT_ENTRY_FORMAT,
294 uuid.UUID(entry['type_uuid']).bytes_le,
295 uuid.UUID(entry['part_uuid']).bytes_le,
299 entry['name'].encode('UTF-16LE'))
301 # Write the updated entry to the disk
302 entry_offs = header['ptable_offs'] + \
303 header['entry_size'] * entry['index']
304 self._write_disk(entry_offs, raw_entry)
306 # Calculate and update partition table CRC32
307 raw_ptable = self._read_disk(header['ptable_offs'],
308 header['ptable_size'])
309 raw_hdr[13] = binascii.crc32(raw_ptable) & 0xFFFFFFFF
311 # Calculate and update the GPT header CRC
312 raw_hdr[3] = _calc_header_crc(raw_hdr)
314 # Write the updated header to the disk
315 raw_hdr = struct.pack(_GPT_HEADER_FORMAT, *raw_hdr)
316 self._write_disk(header['hdr_offs'], raw_hdr)
318 def change_partition(self, entry):
319 """ Change a GPT partition. The 'entry' argument has the same format as
320 'get_partitions()' returns. This function simply changes the partition
321 table record corresponding to 'entry' in both, the primary and the
322 backup GPT partition tables. The parition table CRC is re-calculated
323 and the GPT headers are modified accordingly. """
325 # Change the primary partition table
326 header = self.read_header(True)
327 self._change_partition(header, entry)
329 # Change the backup partition table
330 header = self.read_header(False)
331 self._change_partition(header, entry)