5 # Copyright (c) 2020 Project CHIP Authors
6 # Copyright (c) 2019-2020 Google LLC.
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
24 # This file contains definitions for working with data encoded in Chip TLV format
28 from __future__ import absolute_import
29 from __future__ import print_function
32 from collections import Mapping, Sequence, OrderedDict
35 TLV_TYPE_SIGNED_INTEGER = 0x00
36 TLV_TYPE_UNSIGNED_INTEGER = 0x04
37 TLV_TYPE_BOOLEAN = 0x08
38 TLV_TYPE_FLOATING_POINT_NUMBER = 0x0A
39 TLV_TYPE_UTF8_STRING = 0x0C
40 TLV_TYPE_BYTE_STRING = 0x10
42 TLV_TYPE_STRUCTURE = 0x15
46 TLV_TAG_CONTROL_ANONYMOUS = 0x00
47 TLV_TAG_CONTROL_CONTEXT_SPECIFIC = 0x20
48 TLV_TAG_CONTROL_COMMON_PROFILE_2Bytes = 0x40
49 TLV_TAG_CONTROL_COMMON_PROFILE_4Bytes = 0x60
50 TLV_TAG_CONTROL_IMPLICIT_PROFILE_2Bytes = 0x80
51 TLV_TAG_CONTROL_IMPLICIT_PROFILE_4Bytes = 0xA0
52 TLV_TAG_CONTROL_FULLY_QUALIFIED_6Bytes = 0xC0
53 TLV_TAG_CONTROL_FULLY_QUALIFIED_8Bytes = 0xE0
55 TLVBoolean_False = TLV_TYPE_BOOLEAN
56 TLVBoolean_True = TLV_TYPE_BOOLEAN + 1
58 TLVEndOfContainer = 0x18
62 INT32_MIN = -2147483648
63 INT64_MIN = -9223372036854775808
67 INT32_MAX = 2147483647
68 INT64_MAX = 9223372036854775807
72 UINT32_MAX = 4294967295
73 UINT64_MAX = 18446744073709551615
76 0x00: "Signed Integer 1-byte value",
77 0x01: "Signed Integer 2-byte value",
78 0x02: "Signed Integer 4-byte value",
79 0x03: "Signed Integer 8-byte value",
80 0x04: "Unsigned Integer 1-byte value",
81 0x05: "Unsigned Integer 2-byte value",
82 0x06: "Unsigned Integer 4-byte value",
83 0x07: "Unsigned Integer 8-byte value",
84 0x08: "Boolean False",
86 0x0A: "Floating Point 4-byte value",
87 0x0B: "Floating Point 8-byte value",
88 0x0C: "UTF-8 String 1-byte length",
89 0x0D: "UTF-8 String 2-byte length",
90 0x0E: "UTF-8 String 4-byte length",
91 0x0F: "UTF-8 String 8-byte length",
92 0x10: "Byte String 1-byte length",
93 0x11: "Byte String 2-byte length",
94 0x12: "Byte String 4-byte length",
95 0x13: "Byte String 8-byte length",
100 0x18: "End of Collection",
105 0x20: "Context 1-byte",
106 0x40: "Common Profile 2-byte",
107 0x60: "Common Profile 4-byte",
108 0x80: "Implicit Profile 2-byte",
109 0xA0: "Implicit Profile 4-byte",
110 0xC0: "Fully Qualified 6-byte",
111 0xE0: "Fully Qualified 8-byte",
115 class TLVWriter(object):
116 def __init__(self, encoding=None, implicitProfile=None):
117 self._encoding = encoding if encoding is not None else bytearray()
118 self._implicitProfile = implicitProfile
119 self._containerStack = []
123 """The object into which encoded TLV data is written.
125 By default this is a bytearray object.
127 return self._encoding
130 def encoding(self, val):
134 def implicitProfile(self):
135 """The Chip profile id used when encoding implicit profile tags.
137 Setting this value will result in an implicit profile tag being encoded
138 whenever the profile of the tag to be encoded matches the specified implicit
141 Setting this value to None (the default) disabled encoding of implicit
144 return self._implicitProfile
146 @implicitProfile.setter
147 def implicitProfile(self, val):
148 self._implicitProfile = val
150 def put(self, tag, val):
151 """Write a value in TLV format with the specified TLV tag.
153 val can be a Python object which will be encoded as follows:
154 - Python bools, floats and strings are encoded as their respective TLV types.
155 - Python ints are encoded as unsigned TLV integers if zero or positive; signed TLV
156 integers if negative.
157 - None is encoded as a TLV Null.
158 - bytes and bytearray objects are encoded as TVL byte strings.
159 - Mapping-like objects (e.g. dict) are encoded as TLV structures. The keys of the
160 map object are expected to be tag values, as described below for the tag argument.
161 Map values are encoded recursively, using the same rules as defined for the val
162 argument. The encoding order of elements depends on the type of the map object.
163 Elements within a dict are automatically encoded tag numerical order. Elements
164 within other forms of mapping object (e.g. OrderedDict) are encoded in the
165 object's natural iteration order.
166 - Sequence-like objects (e.g. arrays) are written as TLV arrays. Elements within
167 the array are encoded recursively, using the same rules as defined for the val
170 tag can be a small int (0-255), a tuple of two integers, or None.
171 If tag is an integer, it is encoded as a TLV context-specific tag.
172 If tag is a two-integer tuple, it is encoded as a TLV profile-specific tag, with
173 the first integer encoded as the profile id and the second as the tag number.
174 If tag is None, it is encoded as a TLV anonymous tag.
178 elif isinstance(val, bool):
179 self.putBool(tag, val)
180 elif isinstance(val, int):
182 self.putSignedInt(tag, val)
184 self.putUnsignedInt(tag, val)
185 elif isinstance(val, float):
186 self.putFloat(tag, val)
187 elif isinstance(val, str):
188 self.putString(tag, val)
189 elif isinstance(val, bytes) or isinstance(val, bytearray):
190 self.putBytes(tag, val)
191 elif isinstance(val, Mapping):
192 self.startStructure(tag)
193 if type(val) == dict:
195 sorted(val.items(), key=lambda item: tlvTagToSortKey(item[0]))
197 for containedTag, containedVal in val.items():
198 self.put(containedTag, containedVal)
200 elif isinstance(val, Sequence):
202 for containedVal in val:
203 self.put(None, containedVal)
206 raise ValueError("Attempt to TLV encode unsupported value")
208 def putSignedInt(self, tag, val):
209 """Write a value as a TLV signed integer with the specified TLV tag."""
210 if val >= INT8_MIN and val <= INT8_MAX:
212 elif val >= INT16_MIN and val <= INT16_MAX:
214 elif val >= INT32_MIN and val <= INT32_MAX:
216 elif val >= INT64_MIN and val <= INT64_MAX:
219 raise ValueError("Integer value out of range")
220 val = struct.pack(format, val)
221 controlAndTag = self._encodeControlAndTag(
222 TLV_TYPE_SIGNED_INTEGER, tag, lenOfLenOrVal=len(val)
224 self._encoding.extend(controlAndTag)
225 self._encoding.extend(val)
227 def putUnsignedInt(self, tag, val):
228 """Write a value as a TLV unsigned integer with the specified TLV tag."""
229 val = self._encodeUnsignedInt(val)
230 controlAndTag = self._encodeControlAndTag(
231 TLV_TYPE_UNSIGNED_INTEGER, tag, lenOfLenOrVal=len(val)
233 self._encoding.extend(controlAndTag)
234 self._encoding.extend(val)
236 def putFloat(self, tag, val):
237 """Write a value as a TLV float with the specified TLV tag."""
238 val = struct.pack("d", val)
239 controlAndTag = self._encodeControlAndTag(
240 TLV_TYPE_FLOATING_POINT_NUMBER, tag, lenOfLenOrVal=len(val)
242 self._encoding.extend(controlAndTag)
243 self._encoding.extend(val)
245 def putString(self, tag, val):
246 """Write a value as a TLV string with the specified TLV tag."""
247 val = val.encode("utf-8")
248 valLen = self._encodeUnsignedInt(len(val))
249 controlAndTag = self._encodeControlAndTag(
250 TLV_TYPE_UTF8_STRING, tag, lenOfLenOrVal=len(valLen)
252 self._encoding.extend(controlAndTag)
253 self._encoding.extend(valLen)
254 self._encoding.extend(val)
256 def putBytes(self, tag, val):
257 """Write a value as a TLV byte string with the specified TLV tag."""
258 valLen = self._encodeUnsignedInt(len(val))
259 controlAndTag = self._encodeControlAndTag(
260 TLV_TYPE_BYTE_STRING, tag, lenOfLenOrVal=len(valLen)
262 self._encoding.extend(controlAndTag)
263 self._encoding.extend(valLen)
264 self._encoding.extend(val)
266 def putBool(self, tag, val):
267 """Write a value as a TLV boolean with the specified TLV tag."""
269 type = TLVBoolean_True
271 type = TLVBoolean_False
272 controlAndTag = self._encodeControlAndTag(type, tag)
273 self._encoding.extend(controlAndTag)
275 def putNull(self, tag):
276 """Write a TLV null with the specified TLV tag."""
277 controlAndTag = self._encodeControlAndTag(TLV_TYPE_NULL, tag)
278 self._encoding.extend(controlAndTag)
280 def startContainer(self, tag, containerType):
281 """Start writing a TLV container with the specified TLV tag.
283 containerType can be one of TLV_TYPE_STRUCTURE, TLV_TYPE_ARRAY or
286 self._verifyValidContainerType(containerType)
287 controlAndTag = self._encodeControlAndTag(containerType, tag)
288 self._encoding.extend(controlAndTag)
289 self._containerStack.insert(0, containerType)
291 def startStructure(self, tag):
292 """Start writing a TLV structure with the specified TLV tag."""
293 self.startContainer(tag, containerType=TLV_TYPE_STRUCTURE)
295 def startArray(self, tag):
296 """Start writing a TLV array with the specified TLV tag."""
297 self.startContainer(tag, containerType=TLV_TYPE_ARRAY)
299 def startPath(self, tag):
300 """Start writing a TLV path with the specified TLV tag."""
301 self.startContainer(tag, containerType=TLV_TYPE_PATH)
303 def endContainer(self):
304 """End writing the current TLV container."""
305 self._containerStack.pop(0)
306 controlAndTag = self._encodeControlAndTag(TLVEndOfContainer, None)
307 self._encoding.extend(controlAndTag)
309 def _encodeControlAndTag(self, type, tag, lenOfLenOrVal=0):
311 if lenOfLenOrVal == 2:
313 elif lenOfLenOrVal == 4:
315 elif lenOfLenOrVal == 8:
319 type != TLVEndOfContainer
320 and len(self._containerStack) != 0
321 and self._containerStack[0] == TLV_TYPE_STRUCTURE
323 raise ValueError("Attempt to encode anonymous tag within TLV structure")
324 controlByte |= TLV_TAG_CONTROL_ANONYMOUS
325 return struct.pack("<B", controlByte)
326 if isinstance(tag, int):
327 if tag < 0 or tag > UINT8_MAX:
328 raise ValueError("Context-specific TLV tag number out of range")
329 if len(self._containerStack) == 0:
331 "Attempt to encode context-specific TLV tag at top level"
333 if self._containerStack[0] == TLV_TYPE_ARRAY:
335 "Attempt to encode context-specific tag within TLV array"
337 controlByte |= TLV_TAG_CONTROL_CONTEXT_SPECIFIC
338 return struct.pack("<BB", controlByte, tag)
339 if isinstance(tag, tuple):
340 (profile, tagNum) = tag
341 if not isinstance(tagNum, int):
342 raise ValueError("Invalid object given for TLV tag")
343 if tagNum < 0 or tagNum > UINT32_MAX:
344 raise ValueError("TLV tag number out of range")
346 if not isinstance(profile, int):
347 raise ValueError("Invalid object given for TLV profile id")
348 if profile < 0 or profile > UINT32_MAX:
349 raise ValueError("TLV profile id value out of range")
351 len(self._containerStack) != 0
352 and self._containerStack[0] == TLV_TYPE_ARRAY
355 "Attempt to encode profile-specific tag within TLV array"
357 if profile is None or profile == self._implicitProfile:
358 if tagNum <= UINT16_MAX:
359 controlByte |= TLV_TAG_CONTROL_IMPLICIT_PROFILE_2Bytes
360 return struct.pack("<BH", controlByte, tagNum)
362 controlByte |= TLV_TAG_CONTROL_IMPLICIT_PROFILE_4Bytes
363 return struct.pack("<BL", controlByte, tagNum)
365 if tagNum <= UINT16_MAX:
366 controlByte |= TLV_TAG_CONTROL_COMMON_PROFILE_2Bytes
367 return struct.pack("<BH", controlByte, tagNum)
369 controlByte |= TLV_TAG_CONTROL_COMMON_PROFILE_4Bytes
370 return struct.pack("<BL", controlByte, tagNum)
372 if tagNum <= UINT16_MAX:
373 controlByte |= TLV_TAG_CONTROL_FULLY_QUALIFIED_6Bytes
374 return struct.pack("<BLH", controlByte, profile, tagNum)
376 controlByte |= TLV_TAG_CONTROL_FULLY_QUALIFIED_8Bytes
377 return struct.pack("<BLL", controlByte, profile, tagNum)
378 raise ValueError("Invalid object given for TLV tag")
381 def _encodeUnsignedInt(val):
383 raise ValueError("Integer value out of range")
386 elif val <= UINT16_MAX:
388 elif val <= UINT32_MAX:
390 elif val <= UINT64_MAX:
393 raise ValueError("Integer value out of range")
394 return struct.pack(format, val)
397 def _verifyValidContainerType(containerType):
399 containerType != TLV_TYPE_STRUCTURE
400 and containerType != TLV_TYPE_ARRAY
401 and containerType != TLV_TYPE_PATH
403 raise ValueError("Invalid TLV container type")
406 class TLVReader(object):
407 def __init__(self, tlv):
414 return self._decodings
417 """Get the dictionary representation of tlv data"""
419 self._get(self._tlv, self._decodings, out)
422 def _decodeControlByte(self, tlv, decoding):
423 (controlByte,) = struct.unpack("<B", tlv[self._bytesRead : self._bytesRead + 1])
424 controlTypeIndex = controlByte & 0xE0
425 decoding["tagControl"] = TagControls[controlTypeIndex]
426 elementtypeIndex = controlByte & 0x1F
427 decoding["type"] = ElementTypes[elementtypeIndex]
430 def _decodeControlAndTag(self, tlv, decoding):
431 """The control byte specifies the type of a TLV element and how its tag, length and value fields are encoded.
432 The control byte consists of two subfields: an element type field which occupies the lower 5 bits,
433 and a tag control field which occupies the upper 3 bits. The element type field encodes the element’s type
434 as well as how the corresponding length and value fields are encoded. In the case of Booleans and the
435 null value, the element type field also encodes the value itself."""
437 self._decodeControlByte(tlv, decoding)
439 if decoding["tagControl"] == "Anonymous":
440 decoding["tag"] = None
441 decoding["tagLen"] = 0
442 elif decoding["tagControl"] == "Context 1-byte":
443 (decoding["tag"],) = struct.unpack(
444 "<B", tlv[self._bytesRead : self._bytesRead + 1]
446 decoding["tagLen"] = 1
448 elif decoding["tagControl"] == "Common Profile 2-byte":
450 (tag,) = struct.unpack("<H", tlv[self._bytesRead : self._bytesRead + 2])
451 decoding["profileTag"] = (profile, tag)
452 decoding["tagLen"] = 2
454 elif decoding["tagControl"] == "Common Profile 4-byte":
456 (tag,) = struct.unpack("<L", tlv[self._bytesRead : self._bytesRead + 4])
457 decoding["profileTag"] = (profile, tag)
458 decoding["tagLen"] = 4
460 elif decoding["tagControl"] == "Implicit Profile 2-byte":
462 (tag,) = struct.unpack("<H", tlv[self._bytesRead : self._bytesRead + 2])
463 decoding["profileTag"] = (profile, tag)
464 decoding["tagLen"] = 2
466 elif decoding["tagControl"] == "Implicit Profile 4-byte":
468 (tag,) = struct.unpack("<L", tlv[self._bytesRead : self._bytesRead + 4])
469 decoding["profileTag"] = (profile, tag)
470 decoding["tagLen"] = 4
472 elif decoding["tagControl"] == "Fully Qualified 6-byte":
473 (profile,) = struct.unpack("<L", tlv[self._bytesRead : self._bytesRead + 4])
474 (tag,) = struct.unpack("<H", tlv[self._bytesRead + 4 : self._bytesRead + 6])
475 decoding["profileTag"] = (profile, tag)
476 decoding["tagLen"] = 2
478 elif decoding["tagControl"] == "Fully Qualified 8-byte":
479 (profile,) = struct.unpack("<L", tlv[self._bytesRead : self._bytesRead + 4])
480 (tag,) = struct.unpack("<L", tlv[self._bytesRead + 4 : self._bytesRead + 8])
481 decoding["profileTag"] = (profile, tag)
482 decoding["tagLen"] = 4
485 def _decodeStrLength(self, tlv, decoding):
486 """UTF-8 or Byte StringLength fields are encoded in 0, 1, 2 or 4 byte widths, as specified by
487 the element type field. If the element type needs a length field grab the next bytes as length"""
488 if "length" in decoding["type"]:
489 if "1-byte" in decoding["type"]:
490 (decoding["strDataLen"],) = struct.unpack(
491 "<B", tlv[self._bytesRead : self._bytesRead + 1]
493 decoding["strDataLenLen"] = 1
495 elif "2-byte" in decoding["type"]:
496 (decoding["strDataLen"],) = struct.unpack(
497 "<H", tlv[self._bytesRead : self._bytesRead + 2]
499 decoding["strDataLenLen"] = 2
501 elif "4-byte" in decoding["type"]:
502 (decoding["strDataLen"],) = struct.unpack(
503 "<L", tlv[self._bytesRead : self._bytesRead + 4]
505 decoding["strDataLenLen"] = 4
507 elif "8-byte" in decoding["type"]:
508 (decoding["strDataLen"],) = struct.unpack(
509 "<Q", tlv[self._bytesRead : self._bytesRead + 8]
511 decoding["strDataLenLen"] = 8
514 decoding["strDataLen"] = 0
515 decoding["strDataLenLen"] = 0
517 def _decodeVal(self, tlv, decoding):
518 """decode primitive tlv value to the corresponding python value, tlv array and path are decoded as
519 python list, tlv structure is decoded as python dictionary"""
520 if decoding["type"] == "Structure":
521 decoding["value"] = {}
522 decoding["Structure"] = []
523 self._get(tlv, decoding["Structure"], decoding["value"])
524 elif decoding["type"] == "Array":
525 decoding["value"] = []
526 decoding["Array"] = []
527 self._get(tlv, decoding["Array"], decoding["value"])
528 elif decoding["type"] == "Path":
529 decoding["value"] = []
530 decoding["Path"] = []
531 self._get(tlv, decoding["Path"], decoding["value"])
532 elif decoding["type"] == "Null":
533 decoding["value"] = None
534 elif decoding["type"] == "End of Collection":
535 decoding["value"] = None
536 elif decoding["type"] == "Boolean True":
537 decoding["value"] = True
538 elif decoding["type"] == "Boolean False":
539 decoding["value"] = False
540 elif decoding["type"] == "Unsigned Integer 1-byte value":
541 (decoding["value"],) = struct.unpack(
542 "<B", tlv[self._bytesRead : self._bytesRead + 1]
545 elif decoding["type"] == "Signed Integer 1-byte value":
546 (decoding["value"],) = struct.unpack(
547 "<b", tlv[self._bytesRead : self._bytesRead + 1]
550 elif decoding["type"] == "Unsigned Integer 2-byte value":
551 (decoding["value"],) = struct.unpack(
552 "<H", tlv[self._bytesRead : self._bytesRead + 2]
555 elif decoding["type"] == "Signed Integer 2-byte value":
556 (decoding["value"],) = struct.unpack(
557 "<h", tlv[self._bytesRead : self._bytesRead + 2]
560 elif decoding["type"] == "Unsigned Integer 4-byte value":
561 (decoding["value"],) = struct.unpack(
562 "<L", tlv[self._bytesRead : self._bytesRead + 4]
565 elif decoding["type"] == "Signed Integer 4-byte value":
566 (decoding["value"],) = struct.unpack(
567 "<l", tlv[self._bytesRead : self._bytesRead + 4]
570 elif decoding["type"] == "Unsigned Integer 8-byte value":
571 (decoding["value"],) = struct.unpack(
572 "<Q", tlv[self._bytesRead : self._bytesRead + 8]
575 elif decoding["type"] == "Signed Integer 8-byte value":
576 (decoding["value"],) = struct.unpack(
577 "<q", tlv[self._bytesRead : self._bytesRead + 8]
580 elif decoding["type"] == "Floating Point 4-byte value":
581 (decoding["value"],) = struct.unpack(
582 "<f", tlv[self._bytesRead : self._bytesRead + 4]
585 elif decoding["type"] == "Floating Point 8-byte value":
586 (decoding["value"],) = struct.unpack(
587 "<d", tlv[self._bytesRead : self._bytesRead + 8]
590 elif "UTF-8 String" in decoding["type"]:
591 (val,) = struct.unpack(
592 "<%ds" % decoding["strDataLen"],
593 tlv[self._bytesRead : self._bytesRead + decoding["strDataLen"]],
596 decoding["value"] = str(val, "utf-8")
597 except Exception as ex:
598 decoding["value"] = val
599 self._bytesRead += decoding["strDataLen"]
600 elif "Byte String" in decoding["type"]:
601 (val,) = struct.unpack(
602 "<%ds" % decoding["strDataLen"],
603 tlv[self._bytesRead : self._bytesRead + decoding["strDataLen"]],
606 decoding["value"] = val
607 self._bytesRead += decoding["strDataLen"]
609 raise ValueError("Attempt to decode unsupported TLV type")
611 def _get(self, tlv, decodings, out):
612 endOfEncoding = False
614 while len(tlv[self._bytesRead :]) > 0 and endOfEncoding == False:
616 self._decodeControlAndTag(tlv, decoding)
617 self._decodeStrLength(tlv, decoding)
618 self._decodeVal(tlv, decoding)
619 decodings.append(decoding)
621 if decoding["type"] == "End of Collection":
624 if "profileTag" in list(decoding.keys()):
625 out[decoding["profileTag"]] = decoding["value"]
626 elif "tag" in list(decoding.keys()):
627 if decoding["tag"] is not None:
628 out[decoding["tag"]] = decoding["value"]
630 if isinstance(out, Mapping):
631 out["Any"] = decoding["value"]
632 elif isinstance(out, Sequence):
633 out.append(decoding["value"])
635 raise ValueError("Attempt to decode unsupported TLV tag")
638 def tlvTagToSortKey(tag):
641 if isinstance(tag, int):
643 elif isinstance(tag, tuple):
644 (profileId, tag) = tag
645 if profileId is None:
648 majorOrder = profileId + 2
650 raise ValueError("Invalid TLV tag")
651 return (majorOrder << 32) + tag
654 if __name__ == "__main__":
662 (6, bytearray([0xDE, 0xAD, 0xBE, 0xEF])),
663 (7, ["Goodbye!", 71024724507, False]),
664 ((0x235A0000, 42), "FOO"),
670 encodedVal = writer.put(None, val)
672 reader = TLVReader(writer.encoding)
675 print("TLVReader input: " + str(val))
676 print("TLVReader output: " + str(out["Any"]))
678 if val == out["Any"]:
679 print("Test Success")
681 print("Test Failure")