From: Dmitriy Kovalev Date: Thu, 7 May 2020 21:26:33 +0000 (-0700) Subject: Implement flexbuffers in python (#5880) X-Git-Tag: v2.0.0~333 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=de89bd193370c8b33686f1f33edd63593e48cd3f;p=platform%2Fupstream%2Fflatbuffers.git Implement flexbuffers in python (#5880) --- diff --git a/python/flatbuffers/flexbuffers.py b/python/flatbuffers/flexbuffers.py new file mode 100644 index 0000000..da10668 --- /dev/null +++ b/python/flatbuffers/flexbuffers.py @@ -0,0 +1,1527 @@ +# Lint as: python3 +# Copyright 2020 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementation of FlexBuffers binary format. + +For more info check https://google.github.io/flatbuffers/flexbuffers.html and +corresponding C++ implementation at +https://github.com/google/flatbuffers/blob/master/include/flatbuffers/flexbuffers.h +""" + +# pylint: disable=invalid-name +# TODO(dkovalev): Add type hints everywhere, so tools like pytypes could work. + +import array +import contextlib +import enum +import struct + +__all__ = ('Type', 'Builder', 'GetRoot', 'Dumps', 'Loads') + + +class BitWidth(enum.IntEnum): + """Supported bit widths of value types. + + These are used in the lower 2 bits of a type field to determine the size of + the elements (and or size field) of the item pointed to (e.g. vector). + """ + W8 = 0 # 2^0 = 1 byte + W16 = 1 # 2^1 = 2 bytes + W32 = 2 # 2^2 = 4 bytes + W64 = 3 # 2^3 = 8 bytes + + @staticmethod + def U(value): + """Returns the minimum `BitWidth` to encode unsigned integer value.""" + assert value >= 0 + + if value < (1 << 8): + return BitWidth.W8 + elif value < (1 << 16): + return BitWidth.W16 + elif value < (1 << 32): + return BitWidth.W32 + elif value < (1 << 64): + return BitWidth.W64 + else: + raise ValueError('value is too big to encode: %s' % value) + + @staticmethod + def I(value): + """Returns the minimum `BitWidth` to encode signed integer value.""" + # -2^(n-1) <= value < 2^(n-1) + # -2^n <= 2 * value < 2^n + # 2 * value < 2^n, when value >= 0 or 2 * (-value) <= 2^n, when value < 0 + # 2 * value < 2^n, when value >= 0 or 2 * (-value) - 1 < 2^n, when value < 0 + # + # if value >= 0: + # return BitWidth.U(2 * value) + # else: + # return BitWidth.U(2 * (-value) - 1) # ~x = -x - 1 + value *= 2 + return BitWidth.U(value if value >= 0 else ~value) + + @staticmethod + def F(value): + """Returns the `BitWidth` to encode floating point value.""" + if struct.unpack('f', struct.pack('f', value))[0] == value: + return BitWidth.W32 + return BitWidth.W64 + + @staticmethod + def B(byte_width): + return { + 1: BitWidth.W8, + 2: BitWidth.W16, + 4: BitWidth.W32, + 8: BitWidth.W64 + }[byte_width] + + +I = {1: 'b', 2: 'h', 4: 'i', 8: 'q'} # Integer formats +U = {1: 'B', 2: 'H', 4: 'I', 8: 'Q'} # Unsigned integer formats +F = {4: 'f', 8: 'd'} # Floating point formats + + +def _Unpack(fmt, buf): + return struct.unpack(fmt[len(buf)], buf)[0] + + +def _UnpackVector(fmt, buf, length): + byte_width = len(buf) // length + return struct.unpack('%d%s' % (length, fmt[byte_width]), buf) + + +def _Pack(fmt, value, byte_width): + return struct.pack(fmt[byte_width], value) + + +def _PackVector(fmt, values, byte_width): + return struct.pack('%d%s' % (len(values), fmt[byte_width]), *values) + + +def _Mutate(fmt, buf, value, byte_width, value_bit_width): + if (1 << value_bit_width) <= byte_width: + buf[:byte_width] = _Pack(fmt, value, byte_width) + return True + return False + + +# Computes how many bytes you'd have to pad to be able to write an +# "scalar_size" scalar if the buffer had grown to "buf_size", +# "scalar_size" is a power of two. +def _PaddingBytes(buf_size, scalar_size): + # ((buf_size + (scalar_size - 1)) // scalar_size) * scalar_size - buf_size + return -buf_size & (scalar_size - 1) + + +def _ShiftSlice(s, offset, length): + start = offset + (0 if s.start is None else s.start) + stop = offset + (length if s.stop is None else s.stop) + return slice(start, stop, s.step) + + +# https://en.cppreference.com/w/cpp/algorithm/lower_bound +def _LowerBound(values, value, pred): + """Implementation of C++ std::lower_bound() algorithm.""" + first, last = 0, len(values) + count = last - first + while count > 0: + i = first + step = count // 2 + i += step + if pred(values[i], value): + i += 1 + first = i + count -= step + 1 + else: + count = step + return first + + +# https://en.cppreference.com/w/cpp/algorithm/binary_search +def _BinarySearch(values, value, pred=lambda x, y: x < y): + """Implementation of C++ std::binary_search() algorithm.""" + index = _LowerBound(values, value, pred) + if index != len(values) and not pred(value, values[index]): + return index + return -1 + + +class Type(enum.IntEnum): + """Supported types of encoded data. + + These are used as the upper 6 bits of a type field to indicate the actual + type. + """ + NULL = 0 + INT = 1 + UINT = 2 + FLOAT = 3 + # Types above stored inline, types below store an offset. + KEY = 4 + STRING = 5 + INDIRECT_INT = 6 + INDIRECT_UINT = 7 + INDIRECT_FLOAT = 8 + MAP = 9 + VECTOR = 10 # Untyped. + + VECTOR_INT = 11 # Typed any size (stores no type table). + VECTOR_UINT = 12 + VECTOR_FLOAT = 13 + VECTOR_KEY = 14 + # DEPRECATED, use VECTOR or VECTOR_KEY instead. + # Read test.cpp/FlexBuffersDeprecatedTest() for details on why. + VECTOR_STRING_DEPRECATED = 15 + + VECTOR_INT2 = 16 # Typed tuple (no type table, no size field). + VECTOR_UINT2 = 17 + VECTOR_FLOAT2 = 18 + VECTOR_INT3 = 19 # Typed triple (no type table, no size field). + VECTOR_UINT3 = 20 + VECTOR_FLOAT3 = 21 + VECTOR_INT4 = 22 # Typed quad (no type table, no size field). + VECTOR_UINT4 = 23 + VECTOR_FLOAT4 = 24 + + BLOB = 25 + BOOL = 26 + VECTOR_BOOL = 36 # To do the same type of conversion of type to vector type + + @staticmethod + def Pack(type_, bit_width): + return (int(type_) << 2) | bit_width + + @staticmethod + def Unpack(packed_type): + return 1 << (packed_type & 0b11), Type(packed_type >> 2) + + @staticmethod + def IsInline(type_): + return type_ <= Type.FLOAT or type_ == Type.BOOL + + @staticmethod + def IsTypedVector(type_): + return Type.VECTOR_INT <= type_ <= Type.VECTOR_STRING_DEPRECATED or \ + type_ == Type.VECTOR_BOOL + + @staticmethod + def IsTypedVectorElementType(type_): + return Type.INT <= type_ <= Type.STRING or type_ == Type.BOOL + + @staticmethod + def ToTypedVectorElementType(type_): + if not Type.IsTypedVector(type_): + raise ValueError('must be typed vector type') + + return Type(type_ - Type.VECTOR_INT + Type.INT) + + @staticmethod + def IsFixedTypedVector(type_): + return Type.VECTOR_INT2 <= type_ <= Type.VECTOR_FLOAT4 + + @staticmethod + def IsFixedTypedVectorElementType(type_): + return Type.INT <= type_ <= Type.FLOAT + + @staticmethod + def ToFixedTypedVectorElementType(type_): + if not Type.IsFixedTypedVector(type_): + raise ValueError('must be fixed typed vector type') + + # 3 types each, starting from length 2. + fixed_type = type_ - Type.VECTOR_INT2 + return Type(fixed_type % 3 + Type.INT), fixed_type // 3 + 2 + + @staticmethod + def ToTypedVector(element_type, fixed_len=0): + """Converts element type to corresponding vector type. + + Args: + element_type: vector element type + fixed_len: number of elements: 0 for typed vector; 2, 3, or 4 for fixed + typed vector. + + Returns: + Typed vector type or fixed typed vector type. + """ + if fixed_len == 0: + if not Type.IsTypedVectorElementType(element_type): + raise ValueError('must be typed vector element type') + else: + if not Type.IsFixedTypedVectorElementType(element_type): + raise ValueError('must be fixed typed vector element type') + + offset = element_type - Type.INT + if fixed_len == 0: + return Type(offset + Type.VECTOR_INT) # TypedVector + elif fixed_len == 2: + return Type(offset + Type.VECTOR_INT2) # FixedTypedVector + elif fixed_len == 3: + return Type(offset + Type.VECTOR_INT3) # FixedTypedVector + elif fixed_len == 4: + return Type(offset + Type.VECTOR_INT4) # FixedTypedVector + else: + raise ValueError('unsupported fixed_len: %s' % fixed_len) + + +class Buf: + """Class to access underlying buffer object starting from the given offset.""" + + def __init__(self, buf, offset): + self._buf = buf + self._offset = offset if offset >= 0 else len(buf) + offset + self._length = len(buf) - self._offset + + def __getitem__(self, key): + if isinstance(key, slice): + return self._buf[_ShiftSlice(key, self._offset, self._length)] + elif isinstance(key, int): + return self._buf[self._offset + key] + else: + raise TypeError('invalid key type') + + def __setitem__(self, key, value): + if isinstance(key, slice): + self._buf[_ShiftSlice(key, self._offset, self._length)] = value + elif isinstance(key, int): + self._buf[self._offset + key] = key + else: + raise TypeError('invalid key type') + + def __repr__(self): + return 'buf[%d:]' % self._offset + + def Find(self, sub): + """Returns the lowest index where the sub subsequence is found.""" + return self._buf[self._offset:].find(sub) + + def Slice(self, offset): + """Returns new `Buf` which starts from the given offset.""" + return Buf(self._buf, self._offset + offset) + + def Indirect(self, offset, byte_width): + """Return new `Buf` based on the encoded offset (indirect encoding).""" + return self.Slice(offset - _Unpack(U, self[offset:offset + byte_width])) + + +class Object: + """Base class for all non-trivial data accessors.""" + __slots__ = '_buf', '_byte_width' + + def __init__(self, buf, byte_width): + self._buf = buf + self._byte_width = byte_width + + @property + def ByteWidth(self): + return self._byte_width + + +class Sized(Object): + """Base class for all data accessors which need to read encoded size.""" + __slots__ = '_size', + + def __init__(self, buf, byte_width, size=0): + super().__init__(buf, byte_width) + if size == 0: + self._size = _Unpack(U, self.SizeBytes) + else: + self._size = size + + @property + def SizeBytes(self): + return self._buf[-self._byte_width:0] + + def __len__(self): + return self._size + + +class Blob(Sized): + """Data accessor for the encoded blob bytes.""" + __slots__ = () + + @property + def Bytes(self): + return self._buf[0:len(self)] + + def __repr__(self): + return 'Blob(%s, size=%d)' % (self._buf, len(self)) + + +class String(Sized): + """Data accessor for the encoded string bytes.""" + __slots__ = () + + @property + def Bytes(self): + return self._buf[0:len(self)] + + def Mutate(self, value): + """Mutates underlying string bytes in place. + + Args: + value: New string to replace the existing one. New string must have less + or equal UTF-8-encoded bytes than the existing one to successfully + mutate underlying byte buffer. + + Returns: + Whether the value was mutated or not. + """ + encoded = value.encode('utf-8') + n = len(encoded) + if n <= len(self): + self._buf[-self._byte_width:0] = _Pack(U, n, self._byte_width) + self._buf[0:n] = encoded + self._buf[n:len(self)] = bytearray(len(self) - n) + return True + return False + + def __str__(self): + return self.Bytes.decode('utf-8') + + def __repr__(self): + return 'String(%s, size=%d)' % (self._buf, len(self)) + + +class Key(Object): + """Data accessor for the encoded key bytes.""" + __slots__ = () + + def __init__(self, buf, byte_width): + assert byte_width == 1 + super().__init__(buf, byte_width) + + @property + def Bytes(self): + return self._buf[0:len(self)] + + def __len__(self): + return self._buf.Find(0) + + def __str__(self): + return self.Bytes.decode('ascii') + + def __repr__(self): + return 'Key(%s, size=%d)' % (self._buf, len(self)) + + +class Vector(Sized): + """Data accessor for the encoded vector bytes.""" + __slots__ = () + + def __getitem__(self, index): + if index < 0 or index >= len(self): + raise IndexError('vector index %s is out of [0, %d) range' % \ + (index, len(self))) + + packed_type = self._buf[len(self) * self._byte_width + index] + buf = self._buf.Slice(index * self._byte_width) + return Ref.PackedType(buf, self._byte_width, packed_type) + + @property + def Value(self): + """Returns the underlying encoded data as a list object.""" + return [e.Value for e in self] + + def __repr__(self): + return 'Vector(%s, byte_width=%d, size=%d)' % \ + (self._buf, self._byte_width, self._size) + + +class TypedVector(Sized): + """Data accessor for the encoded typed vector or fixed typed vector bytes.""" + __slots__ = '_element_type', '_size' + + def __init__(self, buf, byte_width, element_type, size=0): + super().__init__(buf, byte_width, size) + + if element_type == Type.STRING: + # These can't be accessed as strings, since we don't know the bit-width + # of the size field, see the declaration of + # FBT_VECTOR_STRING_DEPRECATED above for details. + # We change the type here to be keys, which are a subtype of strings, + # and will ignore the size field. This will truncate strings with + # embedded nulls. + element_type = Type.KEY + + self._element_type = element_type + + @property + def Bytes(self): + return self._buf[:self._byte_width * len(self)] + + @property + def ElementType(self): + return self._element_type + + def __getitem__(self, index): + if index < 0 or index >= len(self): + raise IndexError('vector index %s is out of [0, %d) range' % \ + (index, len(self))) + + buf = self._buf.Slice(index * self._byte_width) + return Ref(buf, self._byte_width, 1, self._element_type) + + @property + def Value(self): + """Returns underlying data as list object.""" + if not self: + return [] + + if self._element_type is Type.BOOL: + return [bool(e) for e in _UnpackVector(U, self.Bytes, len(self))] + elif self._element_type is Type.INT: + return list(_UnpackVector(I, self.Bytes, len(self))) + elif self._element_type is Type.UINT: + return list(_UnpackVector(U, self.Bytes, len(self))) + elif self._element_type is Type.FLOAT: + return list(_UnpackVector(F, self.Bytes, len(self))) + elif self._element_type is Type.KEY: + return [e.AsKey for e in self] + elif self._element_type is Type.STRING: + return [e.AsString for e in self] + else: + raise TypeError('unsupported element_type: %s' % self._element_type) + + def __repr__(self): + return 'TypedVector(%s, byte_width=%d, element_type=%s, size=%d)' % \ + (self._buf, self._byte_width, self._element_type, self._size) + + +class Map(Vector): + """Data accessor for the encoded map bytes.""" + + @staticmethod + def CompareKeys(a, b): + if isinstance(a, Ref): + a = a.AsKeyBytes + if isinstance(b, Ref): + b = b.AsKeyBytes + return a < b + + def __getitem__(self, key): + if isinstance(key, int): + return super().__getitem__(key) + + index = _BinarySearch(self.Keys, key.encode('ascii'), self.CompareKeys) + if index != -1: + return super().__getitem__(index) + + raise KeyError(key) + + @property + def Keys(self): + byte_width = _Unpack(U, self._buf[-2 * self._byte_width:-self._byte_width]) + buf = self._buf.Indirect(-3 * self._byte_width, self._byte_width) + return TypedVector(buf, byte_width, Type.KEY) + + @property + def Values(self): + return Vector(self._buf, self._byte_width) + + @property + def Value(self): + return {k.Value: v.Value for k, v in zip(self.Keys, self.Values)} + + def __repr__(self): + return 'Map(%s, size=%d)' % (self._buf, len(self)) + + +class Ref: + """Data accessor for the encoded data bytes.""" + __slots__ = '_buf', '_parent_width', '_byte_width', '_type' + + @staticmethod + def PackedType(buf, parent_width, packed_type): + byte_width, type_ = Type.Unpack(packed_type) + return Ref(buf, parent_width, byte_width, type_) + + def __init__(self, buf, parent_width, byte_width, type_): + self._buf = buf + self._parent_width = parent_width + self._byte_width = byte_width + self._type = type_ + + def __repr__(self): + return 'Ref(%s, parent_width=%d, byte_width=%d, type_=%s)' % \ + (self._buf, self._parent_width, self._byte_width, self._type) + + @property + def _Bytes(self): + return self._buf[:self._parent_width] + + def _ConvertError(self, target_type): + raise TypeError('cannot convert %s to %s' % (self._type, target_type)) + + def _Indirect(self): + return self._buf.Indirect(0, self._parent_width) + + @property + def IsNull(self): + return self._type is Type.NULL + + @property + def IsBool(self): + return self._type is Type.BOOL + + @property + def AsBool(self): + if self._type is Type.BOOL: + return bool(_Unpack(U, self._Bytes)) + else: + return self.AsInt != 0 + + def MutateBool(self, value): + """Mutates underlying boolean value bytes in place. + + Args: + value: New boolean value. + + Returns: + Whether the value was mutated or not. + """ + return self.IsBool and \ + _Mutate(U, self._buf, value, self._parent_width, BitWidth.W8) + + @property + def IsNumeric(self): + return self.IsInt or self.IsFloat + + @property + def IsInt(self): + return self._type in (Type.INT, Type.INDIRECT_INT, Type.UINT, + Type.INDIRECT_UINT) + + @property + def AsInt(self): + """Returns current reference as integer value.""" + if self.IsNull: + return 0 + elif self.IsBool: + return int(self.AsBool) + elif self._type is Type.INT: + return _Unpack(I, self._Bytes) + elif self._type is Type.INDIRECT_INT: + return _Unpack(I, self._Indirect()[:self._byte_width]) + if self._type is Type.UINT: + return _Unpack(U, self._Bytes) + elif self._type is Type.INDIRECT_UINT: + return _Unpack(U, self._Indirect()[:self._byte_width]) + elif self.IsString: + return len(self.AsString) + elif self.IsKey: + return len(self.AsKey) + elif self.IsBlob: + return len(self.AsBlob) + elif self.IsVector: + return len(self.AsVector) + elif self.IsTypedVector: + return len(self.AsTypedVector) + elif self.IsFixedTypedVector: + return len(self.AsFixedTypedVector) + else: + raise self._ConvertError(Type.INT) + + def MutateInt(self, value): + """Mutates underlying integer value bytes in place. + + Args: + value: New integer value. It must fit to the byte size of the existing + encoded value. + + Returns: + Whether the value was mutated or not. + """ + if self._type is Type.INT: + return _Mutate(I, self._buf, value, self._parent_width, BitWidth.I(value)) + elif self._type is Type.INDIRECT_INT: + return _Mutate(I, self._Indirect(), value, self._byte_width, + BitWidth.I(value)) + elif self._type is Type.UINT: + return _Mutate(U, self._buf, value, self._parent_width, BitWidth.U(value)) + elif self._type is Type.INDIRECT_UINT: + return _Mutate(U, self._Indirect(), value, self._byte_width, + BitWidth.U(value)) + else: + return False + + @property + def IsFloat(self): + return self._type in (Type.FLOAT, Type.INDIRECT_FLOAT) + + @property + def AsFloat(self): + """Returns current reference as floating point value.""" + if self.IsNull: + return 0.0 + elif self.IsBool: + return float(self.AsBool) + elif self.IsInt: + return float(self.AsInt) + elif self._type is Type.FLOAT: + return _Unpack(F, self._Bytes) + elif self._type is Type.INDIRECT_FLOAT: + return _Unpack(F, self._Indirect()[:self._byte_width]) + elif self.IsString: + return float(self.AsString) + elif self.IsVector: + return float(len(self.AsVector)) + elif self.IsTypedVector(): + return float(len(self.AsTypedVector)) + elif self.IsFixedTypedVector(): + return float(len(self.FixedTypedVector)) + else: + raise self._ConvertError(Type.FLOAT) + + def MutateFloat(self, value): + """Mutates underlying floating point value bytes in place. + + Args: + value: New float value. It must fit to the byte size of the existing + encoded value. + + Returns: + Whether the value was mutated or not. + """ + if self._type is Type.FLOAT: + return _Mutate(F, self._buf, value, self._parent_width, + BitWidth.B(self._parent_width)) + elif self._type is Type.INDIRECT_FLOAT: + return _Mutate(F, self._Indirect(), value, self._byte_width, + BitWidth.B(self._byte_width)) + else: + return False + + @property + def IsKey(self): + return self._type is Type.KEY + + @property + def AsKeyBytes(self): + if self.IsKey: + return Key(self._Indirect(), self._byte_width).Bytes + else: + raise self._ConvertError(Type.KEY) + + @property + def AsKey(self): + if self.IsKey: + return str(Key(self._Indirect(), self._byte_width)) + else: + raise self._ConvertError(Type.KEY) + + @property + def IsString(self): + return self._type is Type.STRING + + @property + def AsString(self): + if self.IsString: + return str(String(self._Indirect(), self._byte_width)) + elif self.IsKey: + return self.AsKey + else: + raise self._ConvertError(Type.STRING) + + def MutateString(self, value): + return String(self._Indirect(), self._byte_width).Mutate(value) + + @property + def IsBlob(self): + return self._type is Type.BLOB + + @property + def AsBlob(self): + if self.IsBlob: + return Blob(self._Indirect(), self._byte_width).Bytes + else: + raise self._ConvertError(Type.BLOB) + + @property + def IsAnyVector(self): + return self.IsVector or self.IsTypedVector or self.IsFixedTypedVector() + + @property + def IsVector(self): + return self._type in (Type.VECTOR, Type.MAP) + + @property + def AsVector(self): + if self.IsVector: + return Vector(self._Indirect(), self._byte_width) + else: + raise self._ConvertError(Type.VECTOR) + + @property + def IsTypedVector(self): + return Type.IsTypedVector(self._type) + + @property + def AsTypedVector(self): + if self.IsTypedVector: + return TypedVector(self._Indirect(), self._byte_width, + Type.ToTypedVectorElementType(self._type)) + else: + raise self._ConvertError('TYPED_VECTOR') + + @property + def IsFixedTypedVector(self): + return Type.IsFixedTypedVector(self._type) + + @property + def AsFixedTypedVector(self): + if self.IsFixedTypedVector: + element_type, size = Type.ToFixedTypedVectorElementType(self._type) + return TypedVector(self._Indirect(), self._byte_width, element_type, size) + else: + raise self._ConvertError('FIXED_TYPED_VECTOR') + + @property + def IsMap(self): + return self._type is Type.MAP + + @property + def AsMap(self): + if self.IsMap: + return Map(self._Indirect(), self._byte_width) + else: + raise self._ConvertError(Type.MAP) + + @property + def Value(self): + """Converts current reference to value of corresponding type. + + This is equivalent to calling `AsInt` for integer values, `AsFloat` for + floating point values, etc. + + Returns: + Value of corresponding type. + """ + if self.IsNull: + return None + elif self.IsBool: + return self.AsBool + elif self.IsInt: + return self.AsInt + elif self.IsFloat: + return self.AsFloat + elif self.IsString: + return self.AsString + elif self.IsKey: + return self.AsKey + elif self.IsBlob: + return self.AsBlob + elif self.IsMap: + return self.AsMap.Value + elif self.IsVector: + return self.AsVector.Value + elif self.IsTypedVector: + return self.AsTypedVector.Value + elif self.IsFixedTypedVector: + return self.AsFixedTypedVector.Value + else: + raise TypeError('cannot convert %r to value' % self) + + +def _IsIterable(obj): + try: + iter(obj) + return True + except TypeError: + return False + + +class Value: + """Class to represent given value during the encoding process.""" + + @staticmethod + def Null(): + return Value(0, Type.NULL, BitWidth.W8) + + @staticmethod + def Bool(value): + return Value(value, Type.BOOL, BitWidth.W8) + + @staticmethod + def Int(value, bit_width): + return Value(value, Type.INT, bit_width) + + @staticmethod + def UInt(value, bit_width): + return Value(value, Type.UINT, bit_width) + + @staticmethod + def Float(value, bit_width): + return Value(value, Type.FLOAT, bit_width) + + @staticmethod + def Key(offset): + return Value(offset, Type.KEY, BitWidth.W8) + + def __init__(self, value, type_, min_bit_width): + self._value = value + self._type = type_ + + # For scalars: of itself, for vector: of its elements, for string: length. + self._min_bit_width = min_bit_width + + @property + def Value(self): + return self._value + + @property + def Type(self): + return self._type + + @property + def MinBitWidth(self): + return self._min_bit_width + + def StoredPackedType(self, parent_bit_width=BitWidth.W8): + return Type.Pack(self._type, self.StoredWidth(parent_bit_width)) + + # We have an absolute offset, but want to store a relative offset + # elem_index elements beyond the current buffer end. Since whether + # the relative offset fits in a certain byte_width depends on + # the size of the elements before it (and their alignment), we have + # to test for each size in turn. + def ElemWidth(self, buf_size, elem_index=0): + if Type.IsInline(self._type): + return self._min_bit_width + for byte_width in 1, 2, 4, 8: + offset_loc = buf_size + _PaddingBytes(buf_size, byte_width) + \ + elem_index * byte_width + bit_width = BitWidth.U(offset_loc - self._value) + if byte_width == (1 << bit_width): + return bit_width + raise ValueError('relative offset is too big') + + def StoredWidth(self, parent_bit_width=BitWidth.W8): + if Type.IsInline(self._type): + return max(self._min_bit_width, parent_bit_width) + return self._min_bit_width + + def __repr__(self): + return 'Value(%s, %s, %s)' % (self._value, self._type, self._min_bit_width) + + def __str__(self): + return str(self._value) + + +def InMap(func): + def wrapper(self, *args, **kwargs): + if isinstance(args[0], str): + self.Key(args[0]) + func(self, *args[1:], **kwargs) + else: + func(self, *args, **kwargs) + return wrapper + + +def InMapForString(func): + def wrapper(self, *args): + if len(args) == 1: + func(self, args[0]) + elif len(args) == 2: + self.Key(args[0]) + func(self, args[1]) + else: + raise ValueError('invalid number of arguments') + return wrapper + + +class Pool: + """Collection of (data, offset) pairs sorted by data for quick access.""" + + def __init__(self): + self._pool = [] # sorted list of (data, offset) tuples + + def FindOrInsert(self, data, offset): + do = data, offset + index = _BinarySearch(self._pool, do, lambda a, b: a[0] < b[0]) + if index != -1: + _, offset = self._pool[index] + return offset + self._pool.insert(index, do) + return None + + def Clear(self): + self._pool = [] + + @property + def Elements(self): + return [data for data, _ in self._pool] + + +class Builder: + """Helper class to encode structural data into flexbuffers format.""" + + def __init__(self, + share_strings=False, + share_keys=True, + force_min_bit_width=BitWidth.W8): + self._share_strings = share_strings + self._share_keys = share_keys + self._force_min_bit_width = force_min_bit_width + + self._string_pool = Pool() + self._key_pool = Pool() + + self._finished = False + self._buf = bytearray() + self._stack = [] + + def __len__(self): + return len(self._buf) + + @property + def StringPool(self): + return self._string_pool + + @property + def KeyPool(self): + return self._key_pool + + def Clear(self): + self._string_pool.Clear() + self._key_pool.Clear() + self._finished = False + self._buf = bytearray() + self._stack = [] + + def Finish(self): + """Finishes encoding process and returns underlying buffer.""" + if self._finished: + raise RuntimeError('builder has been already finished') + + # If you hit this exception, you likely have objects that were never + # included in a parent. You need to have exactly one root to finish a + # buffer. Check your Start/End calls are matched, and all objects are inside + # some other object. + if len(self._stack) != 1: + raise RuntimeError('internal stack size must be one') + + value = self._stack[0] + byte_width = self._Align(value.ElemWidth(len(self._buf))) + self._WriteAny(value, byte_width=byte_width) # Root value + self._Write(U, value.StoredPackedType(), byte_width=1) # Root type + self._Write(U, byte_width, byte_width=1) # Root size + + self.finished = True + return self._buf + + def _ReadKey(self, offset): + key = self._buf[offset:] + return key[:key.find(0)] + + def _Align(self, alignment): + byte_width = 1 << alignment + self._buf.extend(b'\x00' * _PaddingBytes(len(self._buf), byte_width)) + return byte_width + + def _Write(self, fmt, value, byte_width): + self._buf.extend(_Pack(fmt, value, byte_width)) + + def _WriteVector(self, fmt, values, byte_width): + self._buf.extend(_PackVector(fmt, values, byte_width)) + + def _WriteOffset(self, offset, byte_width): + relative_offset = len(self._buf) - offset + assert byte_width == 8 or relative_offset < (1 << (8 * byte_width)) + self._Write(U, relative_offset, byte_width) + + def _WriteAny(self, value, byte_width): + fmt = { + Type.NULL: U, Type.BOOL: U, Type.INT: I, Type.UINT: U, Type.FLOAT: F + }.get(value.Type) + if fmt: + self._Write(fmt, value.Value, byte_width) + else: + self._WriteOffset(value.Value, byte_width) + + def _WriteBlob(self, data, append_zero, type_): + bit_width = BitWidth.U(len(data)) + byte_width = self._Align(bit_width) + self._Write(U, len(data), byte_width) + loc = len(self._buf) + self._buf.extend(data) + if append_zero: + self._buf.append(0) + self._stack.append(Value(loc, type_, bit_width)) + return loc + + def _WriteScalarVector(self, element_type, byte_width, elements, fixed): + """Writes scalar vector elements to the underlying buffer.""" + bit_width = BitWidth.B(byte_width) + # If you get this exception, you're trying to write a vector with a size + # field that is bigger than the scalars you're trying to write (e.g. a + # byte vector > 255 elements). For such types, write a "blob" instead. + if BitWidth.U(len(elements)) > bit_width: + raise ValueError('too many elements for the given byte_width') + + self._Align(bit_width) + if not fixed: + self._Write(U, len(elements), byte_width) + + loc = len(self._buf) + + fmt = {Type.INT: I, Type.UINT: U, Type.FLOAT: F}.get(element_type) + if not fmt: + raise TypeError('unsupported element_type') + self._WriteVector(fmt, elements, byte_width) + + type_ = Type.ToTypedVector(element_type, len(elements) if fixed else 0) + self._stack.append(Value(loc, type_, bit_width)) + return loc + + def _CreateVector(self, elements, typed, fixed, keys=None): + """Writes vector elements to the underlying buffer.""" + length = len(elements) + + if fixed and not typed: + raise ValueError('fixed vector must be typed') + + # Figure out smallest bit width we can store this vector with. + bit_width = max(self._force_min_bit_width, BitWidth.U(length)) + prefix_elems = 1 # Vector size + if keys: + bit_width = max(bit_width, keys.ElemWidth(len(self._buf))) + prefix_elems += 2 # Offset to the keys vector and its byte width. + + vector_type = Type.KEY + # Check bit widths and types for all elements. + for i, e in enumerate(elements): + bit_width = max(bit_width, e.ElemWidth(len(self._buf), prefix_elems + i)) + + if typed: + if i == 0: + vector_type = e.Type + else: + if vector_type != e.Type: + raise RuntimeError('typed vector elements must be of the same type') + + if fixed and not Type.IsFixedTypedVectorElementType(vector_type): + raise RuntimeError('must be fixed typed vector element type') + + byte_width = self._Align(bit_width) + # Write vector. First the keys width/offset if available, and size. + if keys: + self._WriteOffset(keys.Value, byte_width) + self._Write(U, 1 << keys.MinBitWidth, byte_width) + + if not fixed: + self._Write(U, length, byte_width) + + # Then the actual data. + loc = len(self._buf) + for e in elements: + self._WriteAny(e, byte_width) + + # Then the types. + if not typed: + for e in elements: + self._buf.append(e.StoredPackedType(bit_width)) + + if keys: + type_ = Type.MAP + else: + if typed: + type_ = Type.ToTypedVector(vector_type, length if fixed else 0) + else: + type_ = Type.VECTOR + + return Value(loc, type_, bit_width) + + def _PushIndirect(self, value, type_, bit_width): + byte_width = self._Align(bit_width) + loc = len(self._buf) + fmt = { + Type.INDIRECT_INT: I, + Type.INDIRECT_UINT: U, + Type.INDIRECT_FLOAT: F + }[type_] + self._Write(fmt, value, byte_width) + self._stack.append(Value(loc, type_, bit_width)) + + @InMapForString + def String(self, value): + """Encodes string value.""" + reset_to = len(self._buf) + encoded = value.encode('utf-8') + loc = self._WriteBlob(encoded, append_zero=True, type_=Type.STRING) + if self._share_strings: + prev_loc = self._string_pool.FindOrInsert(encoded, loc) + if prev_loc is not None: + del self._buf[reset_to:] + self._stack[-1]._value = loc = prev_loc # pylint: disable=protected-access + + return loc + + @InMap + def Blob(self, value): + """Encodes binary blob value. + + Args: + value: A byte/bytearray value to encode + + Returns: + Offset of the encoded value in underlying the byte buffer. + """ + return self._WriteBlob(value, append_zero=False, type_=Type.BLOB) + + def Key(self, value): + """Encodes key value. + + Args: + value: A byte/bytearray/str value to encode. Byte object must not contain + zero bytes. String object must be convertible to ASCII. + + Returns: + Offset of the encoded value in the underlying byte buffer. + """ + if isinstance(value, (bytes, bytearray)): + encoded = value + else: + encoded = value.encode('ascii') + + if 0 in encoded: + raise ValueError('key contains zero byte') + + loc = len(self._buf) + self._buf.extend(encoded) + self._buf.append(0) + if self._share_keys: + prev_loc = self._key_pool.FindOrInsert(encoded, loc) + if prev_loc is not None: + del self._buf[loc:] + loc = prev_loc + + self._stack.append(Value.Key(loc)) + return loc + + def Null(self, key=None): + """Encodes None value.""" + if key: + self.Key(key) + self._stack.append(Value.Null()) + + @InMap + def Bool(self, value): + """Encodes boolean value. + + Args: + value: A boolean value. + """ + self._stack.append(Value.Bool(value)) + + @InMap + def Int(self, value, byte_width=0): + """Encodes signed integer value. + + Args: + value: A signed integer value. + byte_width: Number of bytes to use: 1, 2, 4, or 8. + """ + bit_width = BitWidth.I(value) if byte_width == 0 else BitWidth.B(byte_width) + self._stack.append(Value.Int(value, bit_width)) + + @InMap + def IndirectInt(self, value, byte_width=0): + """Encodes signed integer value indirectly. + + Args: + value: A signed integer value. + byte_width: Number of bytes to use: 1, 2, 4, or 8. + """ + bit_width = BitWidth.I(value) if byte_width == 0 else BitWidth.B(byte_width) + self._PushIndirect(value, Type.INDIRECT_INT, bit_width) + + @InMap + def UInt(self, value, byte_width=0): + """Encodes unsigned integer value. + + Args: + value: An unsigned integer value. + byte_width: Number of bytes to use: 1, 2, 4, or 8. + """ + bit_width = BitWidth.U(value) if byte_width == 0 else BitWidth.B(byte_width) + self._stack.append(Value.UInt(value, bit_width)) + + @InMap + def IndirectUInt(self, value, byte_width=0): + """Encodes unsigned integer value indirectly. + + Args: + value: An unsigned integer value. + byte_width: Number of bytes to use: 1, 2, 4, or 8. + """ + bit_width = BitWidth.U(value) if byte_width == 0 else BitWidth.B(byte_width) + self._PushIndirect(value, Type.INDIRECT_UINT, bit_width) + + @InMap + def Float(self, value, byte_width=0): + """Encodes floating point value. + + Args: + value: A floating point value. + byte_width: Number of bytes to use: 4 or 8. + """ + bit_width = BitWidth.F(value) if byte_width == 0 else BitWidth.B(byte_width) + self._stack.append(Value.Float(value, bit_width)) + + @InMap + def IndirectFloat(self, value, byte_width=0): + """Encodes floating point value indirectly. + + Args: + value: A floating point value. + byte_width: Number of bytes to use: 4 or 8. + """ + bit_width = BitWidth.F(value) if byte_width == 0 else BitWidth.B(byte_width) + self._PushIndirect(value, Type.INDIRECT_FLOAT, bit_width) + + def _StartVector(self): + """Starts vector construction.""" + return len(self._stack) + + def _EndVector(self, start, typed, fixed): + """Finishes vector construction by encodung its elements.""" + vec = self._CreateVector(self._stack[start:], typed, fixed) + del self._stack[start:] + self._stack.append(vec) + return vec.Value + + @contextlib.contextmanager + def Vector(self, key=None): + if key: + self.Key(key) + + try: + start = self._StartVector() + yield self + finally: + self._EndVector(start, typed=False, fixed=False) + + @InMap + def VectorFromElements(self, elements): + """Encodes sequence of any elements as a vector. + + Args: + elements: sequence of elements, they may have different types. + """ + with self.Vector(): + for e in elements: + self.Add(e) + + @contextlib.contextmanager + def TypedVector(self, key=None): + if key: + self.Key(key) + + try: + start = self._StartVector() + yield self + finally: + self._EndVector(start, typed=True, fixed=False) + + @InMap + def TypedVectorFromElements(self, elements, element_type=None): + """Encodes sequence of elements of the same type as typed vector. + + Args: + elements: Sequence of elements, they must be of the same type. + element_type: Suggested element type. Setting it to None means determining + correct value automatically based on the given elements. + """ + if isinstance(elements, array.array): + if elements.typecode == 'f': + self._WriteScalarVector(Type.FLOAT, 4, elements, fixed=False) + elif elements.typecode == 'd': + self._WriteScalarVector(Type.FLOAT, 8, elements, fixed=False) + elif elements.typecode in ('b', 'h', 'i', 'l', 'q'): + self._WriteScalarVector( + Type.INT, elements.itemsize, elements, fixed=False) + elif elements.typecode in ('B', 'H', 'I', 'L', 'Q'): + self._WriteScalarVector( + Type.UINT, elements.itemsize, elements, fixed=False) + else: + raise ValueError('unsupported array typecode: %s' % elements.typecode) + else: + add = self.Add if element_type is None else self.Adder(element_type) + with self.TypedVector(): + for e in elements: + add(e) + + @InMap + def FixedTypedVectorFromElements(self, + elements, + element_type=None, + byte_width=0): + """Encodes sequence of elements of the same type as fixed typed vector. + + Args: + elements: Sequence of elements, they must be of the same type. Allowed + types are `Type.INT`, `Type.UINT`, `Type.FLOAT`. Allowed number of + elements are 2, 3, or 4. + element_type: Suggested element type. Setting it to None means determining + correct value automatically based on the given elements. + byte_width: Number of bytes to use per element. For `Type.INT` and + `Type.UINT`: 1, 2, 4, or 8. For `Type.FLOAT`: 4 or 8. Setting it to 0 + means determining correct value automatically based on the given + elements. + """ + if not 2 <= len(elements) <= 4: + raise ValueError('only 2, 3, or 4 elements are supported') + + types = {type(e) for e in elements} + if len(types) != 1: + raise TypeError('all elements must be of the same type') + + type_, = types + + if element_type is None: + element_type = {int: Type.INT, float: Type.FLOAT}.get(type_) + if not element_type: + raise TypeError('unsupported element_type: %s' % type_) + + if byte_width == 0: + width = { + Type.UINT: BitWidth.U, + Type.INT: BitWidth.I, + Type.FLOAT: BitWidth.F + }[element_type] + byte_width = 1 << max(width(e) for e in elements) + + self._WriteScalarVector(element_type, byte_width, elements, fixed=True) + + def _StartMap(self): + """Starts map construction.""" + return len(self._stack) + + def _EndMap(self, start): + """Finishes map construction by encodung its elements.""" + # Interleaved keys and values on the stack. + stack = self._stack[start:] + + if len(stack) % 2 != 0: + raise RuntimeError('must be even number of keys and values') + + for key in stack[::2]: + if key.Type is not Type.KEY: + raise RuntimeError('all map keys must be of %s type' % Type.KEY) + + pairs = zip(stack[::2], stack[1::2]) # [(key, value), ...] + pairs = sorted(pairs, key=lambda pair: self._ReadKey(pair[0].Value)) + + del self._stack[start:] + for pair in pairs: + self._stack.extend(pair) + + keys = self._CreateVector(self._stack[start::2], typed=True, fixed=False) + values = self._CreateVector( + self._stack[start + 1::2], typed=False, fixed=False, keys=keys) + + del self._stack[start:] + self._stack.append(values) + return values.Value + + @contextlib.contextmanager + def Map(self, key=None): + if key: + self.Key(key) + + try: + start = self._StartMap() + yield self + finally: + self._EndMap(start) + + def MapFromElements(self, elements): + start = self._StartMap() + for k, v in elements.items(): + self.Key(k) + self.Add(v) + self._EndMap(start) + + def Adder(self, type_): + return { + Type.BOOL: self.Bool, + Type.INT: self.Int, + Type.INDIRECT_INT: self.IndirectInt, + Type.UINT: self.UInt, + Type.INDIRECT_UINT: self.IndirectUInt, + Type.FLOAT: self.Float, + Type.INDIRECT_FLOAT: self.IndirectFloat, + Type.KEY: self.Key, + Type.BLOB: self.Blob, + Type.STRING: self.String, + }[type_] + + @InMapForString + def Add(self, value): + """Encodes value of any supported type.""" + if value is None: + self.Null() + elif isinstance(value, bool): + self.Bool(value) + elif isinstance(value, int): + self.Int(value) + elif isinstance(value, float): + self.Float(value) + elif isinstance(value, str): + self.String(value) + elif isinstance(value, (bytes, bytearray)): + self.Blob(value) + elif isinstance(value, dict): + with self.Map(): + for k, v in value.items(): + self.Key(k) + self.Add(v) + elif isinstance(value, array.array): + self.TypedVectorFromElements(value) + elif _IsIterable(value): + self.VectorFromElements(value) + else: + raise TypeError('unsupported python type: %s' % type(value)) + + @property + def LastValue(self): + return self._stack[-1] + + @InMap + def ReuseValue(self, value): + self._stack.append(value) + + +def GetRoot(buf): + """Returns root `Ref` object for the given buffer.""" + if len(buf) < 3: + raise ValueError('buffer is too small') + byte_width = buf[-1] + return Ref.PackedType( + Buf(buf, -(2 + byte_width)), byte_width, packed_type=buf[-2]) + + +def Dumps(obj): + """Returns bytearray with the encoded python object.""" + fbb = Builder() + fbb.Add(obj) + return fbb.Finish() + + +def Loads(buf): + """Returns python object decoded from the buffer.""" + return GetRoot(buf).Value diff --git a/tests/PythonTest.sh b/tests/PythonTest.sh index 17133b0..ebe49bf 100755 --- a/tests/PythonTest.sh +++ b/tests/PythonTest.sh @@ -35,6 +35,11 @@ function run_tests() { COMPARE_GENERATED_TO_GO=0 \ COMPARE_GENERATED_TO_JAVA=0 \ $1 py_test.py $2 $3 $4 + if [ $1 = python3 ]; then + PYTHONDONTWRITEBYTECODE=1 \ + PYTHONPATH=${runtime_library_dir}:${gen_code_path} \ + $1 py_flexbuffers_test.py + fi interpreters_tested+=(${1}) echo fi diff --git a/tests/py_flexbuffers_test.py b/tests/py_flexbuffers_test.py new file mode 100644 index 0000000..6696b3e --- /dev/null +++ b/tests/py_flexbuffers_test.py @@ -0,0 +1,1523 @@ +# Lint as: python3 +# Copyright 2020 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for flexbuffers.py.""" + +import array +import os.path +import struct +import unittest + +from flatbuffers import flexbuffers + +Type = flexbuffers.Type + +LOG2 = {1: 0, 2: 1, 4: 2, 8: 3} + +GOLD_FLEXBUFFER_OBJ = { + 'bar': [1, 2, 3], + 'bar3': [1, 2, 3], + 'bool': True, + 'bools': [True, False, True, False], + 'foo': 100.0, + 'mymap': {'foo': 'Fred'}, + 'vec': [-100, 'Fred', 4.0, b'M', False, 4.0] +} + +GOLD_FLEXBUFFER_FILE = 'gold_flexbuffer_example.bin' + + +def read_test_file(name): + with open(os.path.join(os.path.dirname(__file__), name), 'rb') as f: + return f.read() + + +def packed_type(type_, i): + return (type_ << 2) | LOG2[i] + + +def uint_size(value): + """Returns number of bytes (power of two) to represent unsigned value.""" + assert value >= 0 + + n = 8 + while not value < (1 << n): + n *= 2 + return n // 8 + + +def int_size(value): + """Returns number of bytes (power of two) to represent signed value.""" + n = 8 + while not -(1 << (n - 1)) <= value < (1 << (n - 1)): + n *= 2 + return n // 8 + + +def uint_sizes(value): + return tuple(1 << i for i in range(LOG2[uint_size(value)], 4)) + + +def int_sizes(value): + return tuple(1 << i for i in range(LOG2[int_size(value)], 4)) + + +def int_bytes(value, byte_width): + return struct.pack({1: 'b', 2: 'h', 4: 'i', 8: 'q'}[byte_width], value) + + +def uint_bytes(value, byte_width): + return struct.pack({1: 'B', 2: 'H', 4: 'I', 8: 'Q'}[byte_width], value) + + +def float_bytes(value, byte_width): + return struct.pack({4: 'f', 8: 'd'}[byte_width], value) + + +def min_value(type_, byte_width): + assert byte_width > 0 + + if type_ in (Type.INT, Type.INDIRECT_INT): + return -(1 << (8 * byte_width - 1)) + elif type_ in (Type.UINT, Type.INDIRECT_UINT): + return 0 + else: + raise ValueError('Unsupported type %s' % type_) + + +def max_value(type_, byte_width): + assert byte_width > 0 + + if type_ in (Type.INT, Type.INDIRECT_INT): + return (1 << (8 * byte_width - 1)) - 1 + elif type_ in (Type.UINT, Type.INDIRECT_UINT): + return (1 << 8 * byte_width) - 1 + else: + raise ValueError('Unsupported type %s' % type_) + + +def str_bytes(value, byte_width): + value_bytes = value.encode('utf-8') + return [*uint_bytes(len(value_bytes), byte_width), *value_bytes, 0] + + +def key_bytes(value): + return [*value.encode('ascii'), 0] + + +def encode_type(type_, value, byte_width=None): + fbb = flexbuffers.Builder() + add = fbb.Adder(type_) + if byte_width: + add(value, byte_width) + else: + add(value) + return fbb.Finish() + + +INT_MIN_MAX_VALUES = (min_value(Type.INT, 1), max_value(Type.INT, 1), + min_value(Type.INT, 2), max_value(Type.INT, 2), + min_value(Type.INT, 4), max_value(Type.INT, 4), + min_value(Type.INT, 8), max_value(Type.INT, 8)) + +UINT_MIN_MAX_VALUES = (0, max_value(Type.UINT, 1), max_value(Type.UINT, 2), + max_value(Type.UINT, 4), max_value(Type.UINT, 8)) + + +class UtilTest(unittest.TestCase): + """Tests to check FlexBuffer utility functions.""" + + def _test_type_predicate(self, pred, types): + for type_ in types: + with self.subTest(type=type_, pred=pred): + self.assertTrue(pred(type_)) + + for type_ in set(Type).difference(types): + with self.subTest(type=type_, pred=pred): + self.assertFalse(pred(type_)) + + def test_inline_types(self): + self._test_type_predicate( + Type.IsInline, (Type.NULL, Type.INT, Type.UINT, Type.FLOAT, Type.BOOL)) + + def test_typed_vector(self): + self._test_type_predicate( + Type.IsTypedVector, + (Type.VECTOR_INT, Type.VECTOR_UINT, Type.VECTOR_FLOAT, Type.VECTOR_KEY, + Type.VECTOR_STRING_DEPRECATED, Type.VECTOR_BOOL)) + + self._test_type_predicate( + Type.IsTypedVectorElementType, + (Type.INT, Type.UINT, Type.FLOAT, Type.KEY, Type.STRING, Type.BOOL)) + + with self.assertRaises(ValueError): + Type.ToTypedVectorElementType(Type.VECTOR) + self.assertIs(Type.ToTypedVectorElementType(Type.VECTOR_INT), Type.INT) + self.assertIs(Type.ToTypedVectorElementType(Type.VECTOR_UINT), Type.UINT) + self.assertIs(Type.ToTypedVectorElementType(Type.VECTOR_FLOAT), Type.FLOAT) + self.assertIs(Type.ToTypedVectorElementType(Type.VECTOR_KEY), Type.KEY) + self.assertIs( + Type.ToTypedVectorElementType(Type.VECTOR_STRING_DEPRECATED), + Type.STRING) + self.assertIs(Type.ToTypedVectorElementType(Type.VECTOR_BOOL), Type.BOOL) + + with self.assertRaises(ValueError): + Type.ToTypedVector(Type.VECTOR) + self.assertIs(Type.ToTypedVector(Type.INT), Type.VECTOR_INT) + self.assertIs(Type.ToTypedVector(Type.UINT), Type.VECTOR_UINT) + self.assertIs(Type.ToTypedVector(Type.FLOAT), Type.VECTOR_FLOAT) + self.assertIs(Type.ToTypedVector(Type.KEY), Type.VECTOR_KEY) + self.assertIs( + Type.ToTypedVector(Type.STRING), Type.VECTOR_STRING_DEPRECATED) + self.assertIs(Type.ToTypedVector(Type.BOOL), Type.VECTOR_BOOL) + + def test_fixed_typed_vector(self): + self._test_type_predicate( + Type.IsFixedTypedVector, + (Type.VECTOR_INT2, Type.VECTOR_UINT2, Type.VECTOR_FLOAT2, + Type.VECTOR_INT3, Type.VECTOR_UINT3, Type.VECTOR_FLOAT3, + Type.VECTOR_INT4, Type.VECTOR_UINT4, Type.VECTOR_FLOAT4)) + + self._test_type_predicate(Type.IsFixedTypedVectorElementType, + (Type.INT, Type.UINT, Type.FLOAT)) + + self.assertEqual( + Type.ToFixedTypedVectorElementType(Type.VECTOR_INT2), (Type.INT, 2)) + self.assertEqual( + Type.ToFixedTypedVectorElementType(Type.VECTOR_UINT2), (Type.UINT, 2)) + self.assertEqual( + Type.ToFixedTypedVectorElementType(Type.VECTOR_FLOAT2), (Type.FLOAT, 2)) + self.assertEqual( + Type.ToFixedTypedVectorElementType(Type.VECTOR_INT3), (Type.INT, 3)) + self.assertEqual( + Type.ToFixedTypedVectorElementType(Type.VECTOR_UINT3), (Type.UINT, 3)) + self.assertEqual( + Type.ToFixedTypedVectorElementType(Type.VECTOR_FLOAT3), (Type.FLOAT, 3)) + self.assertEqual( + Type.ToFixedTypedVectorElementType(Type.VECTOR_INT4), (Type.INT, 4)) + self.assertEqual( + Type.ToFixedTypedVectorElementType(Type.VECTOR_UINT4), (Type.UINT, 4)) + self.assertEqual( + Type.ToFixedTypedVectorElementType(Type.VECTOR_FLOAT4), (Type.FLOAT, 4)) + + # Invalid size + for type_ in Type.INT, Type.UINT, Type.FLOAT: + with self.assertRaises(ValueError): + Type.ToTypedVector(type_, 1) + with self.assertRaises(ValueError): + Type.ToTypedVector(type_, 5) + + # Invalid element type + for length in 1, 2, 3, 4, 5: + with self.assertRaises(ValueError): + Type.ToTypedVector(Type.STRING, length) + + self.assertIs(Type.ToTypedVector(Type.INT, 2), Type.VECTOR_INT2) + self.assertIs(Type.ToTypedVector(Type.INT, 3), Type.VECTOR_INT3) + self.assertIs(Type.ToTypedVector(Type.INT, 4), Type.VECTOR_INT4) + + self.assertIs(Type.ToTypedVector(Type.UINT, 2), Type.VECTOR_UINT2) + self.assertIs(Type.ToTypedVector(Type.UINT, 3), Type.VECTOR_UINT3) + self.assertIs(Type.ToTypedVector(Type.UINT, 4), Type.VECTOR_UINT4) + + self.assertIs(Type.ToTypedVector(Type.FLOAT, 2), Type.VECTOR_FLOAT2) + self.assertIs(Type.ToTypedVector(Type.FLOAT, 3), Type.VECTOR_FLOAT3) + self.assertIs(Type.ToTypedVector(Type.FLOAT, 4), Type.VECTOR_FLOAT4) + + def test_width(self): + for x in range(1 << 10): + self.assertEqual(flexbuffers.BitWidth.U(x), LOG2[uint_size(x)]) + + for x in range(-(1 << 10), 1 << 10): + self.assertEqual(flexbuffers.BitWidth.I(x), LOG2[int_size(x)]) + + def test_padding(self): + self.assertEqual(flexbuffers._PaddingBytes(0, 4), 0) + self.assertEqual(flexbuffers._PaddingBytes(0, 8), 0) + self.assertEqual(flexbuffers._PaddingBytes(0, 16), 0) + + self.assertEqual(flexbuffers._PaddingBytes(1, 8), 7) + self.assertEqual(flexbuffers._PaddingBytes(17, 8), 7) + + self.assertEqual(flexbuffers._PaddingBytes(42, 2), 0) + + +class DecoderTest(unittest.TestCase): + """Tests to check FlexBuffer decoding functions. + + Common variable names used in the tests for compactness: + bw: byte_width + ebw: element_byte_width + kbw: key_byte_width + vbw: value_byte_width + tbw: type_byte_width + + Having '_ignored' suffix means that variable doesn't affect the constructed + byte buffer size. + """ + + def test_null(self): + for bw in 1, 2, 4, 8: + for ebw_ignored in 1, 2, 4, 8: + with self.subTest(bw=bw, ebw_ignored=ebw_ignored): + data = bytes([ + *uint_bytes(0, bw), + packed_type(Type.NULL, ebw_ignored), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsNull) + self.assertEqual(root.AsBool, False) + self.assertEqual(root.AsInt, 0) + self.assertEqual(root.AsFloat, 0.0) + + for prop in (type(root).AsKey, type(root).AsString, type(root).AsBlob, + type(root).AsVector, type(root).AsTypedVector, + type(root).AsFixedTypedVector, type(root).AsMap): + with self.assertRaises(TypeError): + prop.fget(root) + + self.assertEqual(root.Value, None) + + self.assertIsNone(flexbuffers.Loads(data)) + + def test_bool(self): + for value in False, True: + for bw in 1, 2, 4, 8: + for ebw_ignored in 1, 2, 4, 8: + with self.subTest(bw=bw, ebw_ignored=ebw_ignored): + data = bytes([ + *uint_bytes(int(value), bw), + packed_type(Type.BOOL, ebw_ignored), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsBool) + self.assertEqual(root.AsBool, value) + self.assertEqual(root.AsInt, int(value)) + self.assertEqual(root.AsFloat, float(value)) + + for prop in (type(root).AsKey, type(root).AsString, + type(root).AsBlob, + type(root).AsVector, type(root).AsTypedVector, + type(root).AsFixedTypedVector, type(root).AsMap): + with self.assertRaises(TypeError): + prop.fget(root) + + self.assertEqual(root.Value, value) + + self.assertEqual(flexbuffers.Loads(data), value) + + def test_mutate_bool(self): + root = flexbuffers.GetRoot(flexbuffers.Dumps(True)) + self.assertTrue(root.IsBool) + self.assertTrue(root.AsBool) + + self.assertTrue(root.MutateBool(False)) + self.assertTrue(root.IsBool) + self.assertFalse(root.AsBool) + + self.assertTrue(root.MutateBool(True)) + self.assertTrue(root.IsBool) + self.assertTrue(root.AsBool) + + def _check_int(self, data, value): + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsInt) + self.assertEqual(root.AsInt, value) + self.assertEqual(root.AsBool, bool(value)) + self.assertEqual(root.AsFloat, float(value)) + + for prop in (type(root).AsKey, type(root).AsString, type(root).AsBlob, + type(root).AsVector, type(root).AsTypedVector, + type(root).AsFixedTypedVector, type(root).AsMap): + with self.assertRaises(TypeError): + prop.fget(root) + + self.assertEqual(root.Value, value) + + self.assertEqual(flexbuffers.Loads(data), value) + + def test_int(self): + for value in (0, 1, -1, 15, -17, *INT_MIN_MAX_VALUES): + for bw in int_sizes(value): + for ebw_ignored in 1, 2, 4, 8: + with self.subTest(value=value, bw=bw, ebw_ignored=ebw_ignored): + data = bytes([ + *int_bytes(value, bw), + packed_type(Type.INT, ebw_ignored), + bw, + ]) + + self._check_int(data, value) + + def test_indirect_int(self): + for value in (0, 1, -1, 15, -17, *INT_MIN_MAX_VALUES): + for bw in 1, 2, 4, 8: + for ebw in int_sizes(value): + with self.subTest(value=value, bw=bw, ebw=ebw): + data = bytes([ + # Int + *int_bytes(value, ebw), + # Root + *uint_bytes(ebw, bw), + packed_type(Type.INDIRECT_INT, ebw), + bw, + ]) + self._check_int(data, value) + + def test_uint(self): + for value in (1, *UINT_MIN_MAX_VALUES): + for bw in uint_sizes(value): + for ebw_ignored in 1, 2, 4, 8: + with self.subTest(value=value, bw=bw, ebw_ignored=ebw_ignored): + data = bytes([ + *uint_bytes(value, bw), + packed_type(Type.UINT, ebw_ignored), + bw, + ]) + + self._check_int(data, value) + + def test_inidirect_uint(self): + for value in (1, *UINT_MIN_MAX_VALUES): + for bw in 1, 2, 4, 8: + for ebw in uint_sizes(value): + with self.subTest(value=value, bw=bw, ebw=ebw): + data = bytes([ + # UInt + *uint_bytes(value, ebw), + # Root + *uint_bytes(ebw, bw), + packed_type(Type.INDIRECT_UINT, ebw), + bw, + ]) + + self._check_int(data, value) + + def test_mutate_ints(self): + # Signed + for type_ in Type.INT, Type.INDIRECT_INT: + with self.subTest(type=type_): + root = flexbuffers.GetRoot(encode_type(type_, 56)) + self.assertEqual(root.AsInt, 56) + + for new_value in 0, 1, -1, -128, 127: + self.assertTrue(root.MutateInt(new_value)) + self.assertEqual(root.AsInt, new_value) + + for new_value in -129, 128: + self.assertFalse(root.MutateInt(new_value)) + + # Unsigned + for type_ in Type.UINT, Type.INDIRECT_UINT: + with self.subTest(type=type_): + root = flexbuffers.GetRoot(encode_type(type_, 1)) + self.assertEqual(root.AsInt, 1) + + for new_value in 0, 1, 255: + self.assertTrue(root.MutateInt(new_value)) + self.assertEqual(root.AsInt, new_value) + + self.assertFalse(root.MutateInt(256)) + + # Inside vector + fbb = flexbuffers.Builder() + fbb.VectorFromElements([13, 0, -15]) + data = fbb.Finish() + + self.assertEqual(flexbuffers.Loads(data), [13, 0, -15]) + self.assertTrue(flexbuffers.GetRoot(data).AsVector[0].MutateInt(0)) + self.assertTrue(flexbuffers.GetRoot(data).AsVector[1].MutateInt(-7)) + self.assertTrue(flexbuffers.GetRoot(data).AsVector[2].MutateInt(45)) + self.assertEqual(flexbuffers.Loads(data), [0, -7, 45]) + + # Inside map + fbb = flexbuffers.Builder() + fbb.MapFromElements({'x': -7, 'y': 46}) + data = fbb.Finish() + + self.assertEqual(flexbuffers.Loads(data), {'x': -7, 'y': 46}) + self.assertTrue(flexbuffers.GetRoot(data).AsMap['x'].MutateInt(14)) + self.assertTrue(flexbuffers.GetRoot(data).AsMap['y'].MutateInt(-1)) + self.assertEqual(flexbuffers.Loads(data), {'x': 14, 'y': -1}) + + def _check_float(self, data, value): + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsFloat) + self.assertAlmostEqual(root.AsFloat, value) + + for prop in (type(root).AsKey, type(root).AsString, type(root).AsBlob, + type(root).AsVector, type(root).AsTypedVector, + type(root).AsFixedTypedVector, type(root).AsMap): + with self.assertRaises(TypeError): + prop.fget(root) + + self.assertAlmostEqual(root.Value, value) + + self.assertAlmostEqual(flexbuffers.Loads(data), value) + + def test_float(self): + for value in -1.0, 0.0, 1.0, 3.141592, 1.5e6: + for bw in 4, 8: + for ebw_ignored in 1, 2, 4, 8: + with self.subTest(value=value, bw=bw, ebw_ignored=ebw_ignored): + data = bytes([ + *float_bytes(value, bw), + packed_type(Type.FLOAT, ebw_ignored), + bw, + ]) + + self._check_float(data, value) + + def test_indirect_float(self): + for value in -1.0, 0.0, 1.0, 3.141592, 1.5e6: + for bw in 1, 2, 4, 8: + for ebw in 4, 8: + with self.subTest(value=value, bw=bw, ebw=ebw): + data = bytes([ + # Float + *float_bytes(value, ebw), + # Root + *uint_bytes(ebw, bw), + packed_type(Type.INDIRECT_FLOAT, ebw), + bw, + ]) + + self._check_float(data, value) + + def test_mutate_float(self): + for type_ in Type.FLOAT, Type.INDIRECT_FLOAT: + for bw in 4, 8: + value = 3.141592 + root = flexbuffers.GetRoot(encode_type(type_, value, bw)) + self.assertAlmostEqual(root.AsFloat, value) + + value = 2.71828 + self.assertTrue(root.MutateFloat(value)) + self.assertAlmostEqual(root.AsFloat, value, places=5) + + # Inside vector + data = flexbuffers.Dumps([2.4, 1.5, -7.2]) + + self.assertTrue(flexbuffers.GetRoot(data).AsVector[0].MutateFloat(0.0)) + self.assertTrue(flexbuffers.GetRoot(data).AsVector[1].MutateFloat(15.2)) + self.assertTrue(flexbuffers.GetRoot(data).AsVector[2].MutateFloat(-5.1)) + + for a, b in zip(flexbuffers.Loads(data), [0.0, 15.2, -5.1]): + self.assertAlmostEqual(a, b) + + def test_string(self): + for value in 'red', 'green', 'blue', 'flatbuffers + flexbuffers': + value_bytes = value.encode('utf-8') + for bw in 1, 2, 4, 8: + for lbw in 1, 2, 4, 8: + with self.subTest(bw=bw, lbw=lbw): + data = bytes([ + # String + *uint_bytes(len(value_bytes), lbw), + *value_bytes, + 0, + # Root + *uint_bytes(len(value_bytes) + 1, bw), # offset + packed_type(Type.STRING, lbw), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsString) + self.assertEqual(root.AsString, value) + self.assertEqual(root.Value, value) + self.assertEqual(root.AsInt, len(value)) + + self.assertEqual(flexbuffers.Loads(data), value) + + def test_mutate_string(self): + data = encode_type(Type.STRING, '12345') + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsString) + self.assertEqual(root.AsString, '12345') + + self.assertFalse(root.MutateString('543210')) + + self.assertTrue(root.MutateString('54321')) + self.assertTrue(root.IsString) + self.assertEqual(root.AsString, '54321') + + self.assertTrue(root.MutateString('543')) + self.assertTrue(root.IsString) + self.assertEqual(root.AsString, '543') + + self.assertFalse(root.MutateString('54321')) + + def test_empty_blob(self): + for bw in 1, 2, 4, 8: + for lbw in 1, 2, 4, 8: + with self.subTest(bw=bw, lbw=lbw): + data = bytes([ + # Blob + *uint_bytes(0, lbw), + # Root + *uint_bytes(0, bw), + packed_type(Type.BLOB, lbw), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsBlob) + self.assertEqual(root.AsBlob, bytes()) + self.assertEqual(root.Value, bytes()) + self.assertEqual(flexbuffers.Loads(data), bytes()) + + def test_blob(self): + for blob in [], [215], [23, 75, 124, 0, 45, 15], 255 * [0]: + for bw in 1, 2, 4, 8: + for lbw in 1, 2, 4, 8: + with self.subTest(blob=blob, bw=bw, lbw=lbw): + data = bytes([ + # Blob + *uint_bytes(len(blob), lbw), + *blob, + # Root + *uint_bytes(len(blob), bw), + packed_type(Type.BLOB, lbw), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsBlob) + self.assertEqual(root.AsBlob, bytes(blob)) + self.assertEqual(root.Value, bytes(blob)) + self.assertEqual(flexbuffers.Loads(data), bytes(blob)) + + def test_key(self): + for value in '', 'x', 'color': + for bw in 1, 2, 4, 8: + with self.subTest(value=value, bw=bw): + value_bytes = value.encode('ascii') + data = bytes([ + # Key + *value_bytes, + 0, + # Root + *uint_bytes(len(value_bytes) + 1, bw), + packed_type(Type.KEY, 1), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsKey) + self.assertEqual(root.AsKey, value) + self.assertEqual(root.Value, value) + self.assertEqual(flexbuffers.Loads(data), value) + + def _check_fixed_typed_vector(self, data, vector, type_): + self.assertEqual(flexbuffers.Loads(data), vector) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsFixedTypedVector) + + v = root.AsFixedTypedVector + self.assertEqual(len(v), len(vector)) + self.assertIs(v.ElementType, type_) + self.assertEqual([e.Value for e in v], vector) + self.assertSequenceEqual(v.Value, vector) + + self.assertEqual(root.AsInt, len(vector)) + + def test_fixed_typed_vector_float(self): + for type_, vector in ((Type.VECTOR_FLOAT2, [-75.0, 34.89]), + (Type.VECTOR_FLOAT3, [-75.0, 34.89, 12.0]), + (Type.VECTOR_FLOAT4, [-75.0, 34.89, -1.0, 1.0])): + for bw in 1, 2, 4, 8: + for ebw in 4, 8: + with self.subTest(type=type_, vector=vector, bw=bw, ebw=ebw): + data = bytes([ + # FixedTypedVector + *b''.join(float_bytes(e, ebw) for e in vector), + # Root + *uint_bytes(len(vector) * ebw, bw), + packed_type(type_, ebw), + bw, + ]) + + for a, b in zip(flexbuffers.Loads(data), vector): + self.assertAlmostEqual(a, b, places=2) + + def test_fixed_typed_vector_int(self): + for type_, vector in ((Type.VECTOR_INT2, [0, -13]), (Type.VECTOR_INT3, + [127, 0, -13]), + (Type.VECTOR_INT4, [127, 0, -13, 0])): + for bw in 1, 2, 4, 8: + for ebw in 1, 2, 4, 8: + with self.subTest(type=type_, vector=vector, bw=bw, ebw=ebw): + data = bytes([ + # FixedTypeVector + *b''.join(int_bytes(e, ebw) for e in vector), + # Root + *uint_bytes(ebw * len(vector), bw), + packed_type(type_, ebw), + bw, + ]) + + self._check_fixed_typed_vector(data, vector, Type.INT) + + def test_fixed_typed_vector_uint(self): + for type_, vector in ((Type.VECTOR_UINT2, [0, 13]), + (Type.VECTOR_UINT3, [127, 0, 13]), (Type.VECTOR_UINT4, + [127, 0, 13, 0])): + for bw in 1, 2, 4, 8: + for ebw in 1, 2, 4, 8: + with self.subTest(type=type_, vector=vector, bw=bw, ebw=ebw): + data = bytes([ + # FixedTypeVector + *b''.join(uint_bytes(e, ebw) for e in vector), + # Root + *uint_bytes(ebw * len(vector), bw), + packed_type(type_, ebw), + bw, + ]) + + self._check_fixed_typed_vector(data, vector, Type.UINT) + + def _check_typed_vector(self, data, vector, type_): + self.assertEqual(flexbuffers.Loads(data), vector) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsTypedVector) + + v = root.AsTypedVector + self.assertIs(v.ElementType, type_) + self.assertEqual(len(v), len(vector)) + self.assertEqual([e.Value for e in v], vector) + self.assertSequenceEqual(v.Value, vector) + + self.assertEqual(root.AsInt, len(vector)) + + def test_empty_typed_vector(self): + for type_ in (Type.VECTOR_BOOL, Type.VECTOR_INT, Type.VECTOR_UINT, + Type.VECTOR_FLOAT, Type.VECTOR_KEY, + Type.VECTOR_STRING_DEPRECATED): + for bw in 1, 2, 4, 8: + for ebw in 1, 2, 4, 8: + with self.subTest(type=type_, bw=bw, ebw=ebw): + data = bytes([ + # TypedVector[type_] + *uint_bytes(0, ebw), + # Root + *uint_bytes(0, bw), + packed_type(type_, ebw), + bw + ]) + + element_type = Type.ToTypedVectorElementType(type_) + if element_type == Type.STRING: + element_type = Type.KEY + self._check_typed_vector(data, [], element_type) + + def test_typed_vector_bool(self): + vector = [True, False, False, False, True] + + for bw in 1, 2, 4, 8: + for ebw in 1, 2, 4, 8: + with self.subTest(bw=bw, ebw=ebw): + data = bytes([ + # TypedVector[Type.BOOL] + *uint_bytes(len(vector), ebw), + *b''.join(uint_bytes(int(e), ebw) for e in vector), + # Root + *uint_bytes(len(vector) * ebw, bw), + packed_type(Type.VECTOR_BOOL, ebw), + bw, + ]) + self._check_typed_vector(data, vector, Type.BOOL) + + def test_typed_vector_int(self): + vector = [-100, 200, -300] + + for bw in 1, 2, 4, 8: + for ebw in 2, 4, 8: + with self.subTest(bw=bw, ebw=ebw): + data = bytes([ + # TypedVector[Type.INT] + *uint_bytes(len(vector), ebw), + *b''.join(int_bytes(e, ebw) for e in vector), + # Root + *uint_bytes(len(vector) * ebw, bw), + packed_type(Type.VECTOR_INT, ebw), + bw, + ]) + self._check_typed_vector(data, vector, Type.INT) + + def test_typed_vector_uint(self): + vector = [100, 200, 300, 400, 0] + + for bw in 1, 2, 4, 8: + for ebw in 2, 4, 8: + with self.subTest(bw=bw, ebw=ebw): + data = bytes([ + # TypedVector[Type.UINT] + *uint_bytes(len(vector), ebw), + *b''.join(int_bytes(e, ebw) for e in vector), + # Root + *uint_bytes(len(vector) * ebw, bw), + packed_type(Type.VECTOR_UINT, ebw), + bw, + ]) + self._check_typed_vector(data, vector, Type.UINT) + + def test_typed_vector_float(self): + vector = [3.64, -6.36, 3.14, 634.0, -42.0] + + for bw in 1, 2, 4, 8: + for ebw in 4, 8: + with self.subTest(bw=bw, ebw=ebw): + data = bytes([ + # TypedVector[Type.FLOAT] + *uint_bytes(len(vector), ebw), + *b''.join(float_bytes(e, ebw) for e in vector), + # Root + *uint_bytes(ebw * len(vector), bw), + packed_type(Type.VECTOR_FLOAT, ebw), + bw, + ]) + + for a, b in zip(flexbuffers.Loads(data), vector): + self.assertAlmostEqual(a, b, places=2) + + def test_typed_vector_key(self): + vector = ['red', 'green', 'blue'] + + for bw in 1, 2, 4, 8: + for ebw in 1, 2, 4, 8: + with self.subTest(bw=bw, ebw=ebw): + data = bytes([ + # Keys + *key_bytes(vector[0]), + *key_bytes(vector[1]), + *key_bytes(vector[2]), + # TypedVector[Type.KEY] + *uint_bytes(len(vector), ebw), + *uint_bytes(15 + 1 * ebw, ebw), # offset to vector[0] + *uint_bytes(11 + 2 * ebw, ebw), # offset to vector[1] + *uint_bytes(5 + 3 * ebw, ebw), # offset to vector[2] + # Root + *uint_bytes(len(vector) * ebw, bw), # offset to vector + packed_type(Type.VECTOR_KEY, ebw), + bw, + ]) + self._check_typed_vector(data, vector, Type.KEY) + + def test_typed_vector_string(self): + vector = ['red', 'green', 'blue'] + + for bw in 1, 2, 4, 8: + for ebw in 1, 2, 4, 8: + with self.subTest(bw=bw, ebw=ebw): + data = bytes([ + # Strings + *str_bytes(vector[0], 1), # 5 bytes + *str_bytes(vector[1], 1), # 7 bytes + *str_bytes(vector[2], 1), # 6 bytes + # TypedVector[Type.STRING] + *uint_bytes(len(vector), ebw), + *uint_bytes(17 + 1 * ebw, ebw), # offset to vector[0] + *uint_bytes(12 + 2 * ebw, ebw), # offset to vector[1] + *uint_bytes(5 + 3 * ebw, ebw), # offset to vector[2] + # Root + *uint_bytes(len(vector) * ebw, bw), # offset to vector + packed_type(Type.VECTOR_STRING_DEPRECATED, ebw), + bw, + ]) + + # We have to pass Type.KEY because of Type.VECTOR_STRING_DEPRECATED. + self._check_typed_vector(data, vector, Type.KEY) + + def test_typed_vector_string_deprecated(self): + # Check FlexBuffersDeprecatedTest() inside test.cpp for details. + vector = [300 * 'A', 'test'] + + fbb = flexbuffers.Builder() + with fbb.TypedVector(): + for e in vector: + fbb.String(e) + data = fbb.Finish() + + # We have to pass Type.KEY because of Type.VECTOR_STRING_DEPRECATED. + self._check_typed_vector(data, vector, Type.KEY) + + def test_typed_vector_invalid(self): + fbb = flexbuffers.Builder() + + with self.assertRaises(RuntimeError): + fbb.TypedVectorFromElements(['string', 423]) + + def test_empty_vector(self): + for bw in 1, 2, 4, 8: + for ebw in 1, 2, 4, 8: + data = bytes([ + *uint_bytes(0, ebw), + # Root + *uint_bytes(0, bw), + packed_type(Type.VECTOR, ebw), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsVector) + self.assertEqual(len(root.AsVector), 0) + + self.assertEqual(flexbuffers.Loads(data), []) + + def test_vector1(self): + vector = [300, 400, 500] + + for bw in 1, 2, 4, 8: + for ebw in 2, 4, 8: + for tbw_ignored in 1, 2, 4, 8: + with self.subTest(bw=bw, ebw=ebw, ignore=tbw_ignored): + data = bytes([ + # Vector length + *uint_bytes(len(vector), ebw), + # Vector elements + *int_bytes(vector[0], ebw), + *int_bytes(vector[1], ebw), + *int_bytes(vector[2], ebw), + # Vector types + packed_type(Type.INT, tbw_ignored), + packed_type(Type.INT, tbw_ignored), + packed_type(Type.INT, tbw_ignored), + # Root + *uint_bytes(ebw * len(vector) + len(vector), bw), + packed_type(Type.VECTOR, ebw), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsVector) + self.assertFalse(root.IsMap) + + v = root.AsVector + self.assertEqual(len(v), len(vector)) + + for i in range(len(v)): + self.assertTrue(v[i].IsInt) + self.assertEqual(v[i].AsInt, vector[i]) + + for i, e in enumerate(v): + self.assertTrue(e.IsInt) + self.assertEqual(e.AsInt, vector[i]) + + with self.assertRaises(IndexError): + v[-1].AsInt # pylint: disable=pointless-statement + + with self.assertRaises(IndexError): + v[3].AsInt # pylint: disable=pointless-statement + + with self.assertRaises(TypeError): + root.AsMap # pylint: disable=pointless-statement + + self.assertEqual(root.AsInt, len(vector)) + self.assertEqual(root.AsFloat, float(len(vector))) + + self.assertEqual(flexbuffers.Loads(data), vector) + + def test_vector2(self): + vector = [1984, 'August', True] + + for bw in 1, 2, 4, 8: + with self.subTest(bw=bw): + data = bytes([ + *str_bytes(vector[1], 1), + # Vector + *uint_bytes(len(vector), 2), + *int_bytes(vector[0], 2), + *uint_bytes(11, 2), # offset to 'August' + *uint_bytes(int(vector[2]), 2), + packed_type(Type.INT, 2), + packed_type(Type.STRING, 1), + packed_type(Type.BOOL, 2), + # Root + *uint_bytes(2 * len(vector) + len(vector), bw), # offset to vector + packed_type(Type.VECTOR, 2), + bw, + ]) + self.assertEqual(flexbuffers.Loads(data), vector) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsVector) + + v = root.AsVector + self.assertTrue(v[0].IsInt) + self.assertEqual(v[0].AsInt, 1984) + + self.assertTrue(v[1].IsString) + self.assertEqual(v[1].AsString, 'August') + + self.assertTrue(v[2].IsBool) + self.assertTrue(v[2].AsBool) + + self.assertEqual(v.Value, vector) + + self.assertEqual(root.AsInt, len(vector)) + + def test_empty_map(self): + for bw in 1, 2, 4, 8: + for kbw in 1, 2, 4, 8: + for vbw in 1, 2, 4, 8: + data = bytes([ + *uint_bytes(0, kbw), # Keys length + *uint_bytes(0, vbw), + *uint_bytes(kbw, vbw), + *uint_bytes(0, vbw), # Values length + # Root + *uint_bytes(0, bw), + packed_type(Type.MAP, vbw), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsMap) + self.assertEqual(len(root.AsMap), 0) + + self.assertEqual(flexbuffers.Loads(data), {}) + + def test_map(self): + value = {'foo': 13, 'bar': 14} + + for bw in 1, 2, 4, 8: + for kbw in 1, 2, 4, 8: + for vbw in 1, 2, 4, 8: + with self.subTest(kbw=kbw, vbw=vbw, bw=bw): + data = bytes([ + *key_bytes('foo'), # 4 bytes + *key_bytes('bar'), # 4 bytes + # Map + *uint_bytes(len(value), kbw), + *uint_bytes(4 + 1 * kbw, kbw), # offset to 'bar' + *uint_bytes(8 + 2 * kbw, kbw), # offset to 'foo' + *uint_bytes(len(value) * kbw, vbw), # offset to keys + *uint_bytes(kbw, vbw), + *uint_bytes(len(value), vbw), + *int_bytes(value['bar'], vbw), + *int_bytes(value['foo'], vbw), + packed_type(Type.INT, vbw), + packed_type(Type.INT, vbw), + # Root + *uint_bytes(vbw * len(value) + len(value), + bw), # offset to values + packed_type(Type.MAP, vbw), + bw, + ]) + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsMap) + + m = root.AsMap + self.assertEqual(len(m), 2) + self.assertEqual(m[0].AsInt, 14) + self.assertEqual(m[1].AsInt, 13) + + self.assertEqual(m['bar'].AsInt, 14) + self.assertEqual(m['foo'].AsInt, 13) + + for invalid_key in 'a', 'b', 'no': + with self.assertRaises(KeyError): + m[invalid_key] # pylint: disable=pointless-statement + + values = m.Values + self.assertEqual(len(values), 2) + self.assertEqual(values[0].AsInt, 14) + self.assertEqual(values[1].AsInt, 13) + + keys = m.Keys + self.assertEqual(len(keys), 2) + self.assertEqual(len(keys[0].AsKey), 3) + self.assertEqual(keys[0].AsKey, 'bar') + self.assertEqual(len(keys[1].AsKey), 3) + self.assertEqual(keys[1].AsKey, 'foo') + + keys = [key.AsKey for key in keys] + self.assertEqual(sorted(keys), keys) + + self.assertEqual(root.AsInt, len(value)) + + self.assertEqual(flexbuffers.Loads(data), value) + + def test_alignment(self): + value = ['test', 7] + + data = bytes([ + *key_bytes('test'), # 5 bytes: 'test' and \0 + 0, + 0, + 0, # 3 bytes: alignment + # Vector + *uint_bytes(len(value), byte_width=8), + *uint_bytes(16, byte_width=8), + *uint_bytes(7, byte_width=8), + packed_type(Type.KEY, 1), + packed_type(Type.INT, 8), + # Root + *uint_bytes(8 * len(value) + len(value), 1), + packed_type(Type.VECTOR, 8), + 1, + ]) + + self.assertEqual(flexbuffers.Loads(data), value) + + +class EncoderTest(unittest.TestCase): + """Tests to check FlexBuffer encoding functions.""" + + def test_null(self): + def encode_null(): + fbb = flexbuffers.Builder() + fbb.Null() + return fbb.Finish() + + self.assertIsNone(flexbuffers.Loads(encode_null())) + + def test_bool(self): + for value in False, True: + data = encode_type(Type.BOOL, value) + self.assertEqual(flexbuffers.Loads(data), value) + + def test_int(self): + for byte_width in 1, 2, 4, 8: + for type_ in Type.INT, Type.INDIRECT_INT, Type.UINT, Type.INDIRECT_UINT: + with self.subTest(byte_width=byte_width, type=type_): + value = min_value(type_, byte_width) + data = encode_type(type_, value) + self.assertEqual(flexbuffers.Loads(data), value) + + value = max_value(type_, byte_width) + data = encode_type(type_, value) + self.assertEqual(flexbuffers.Loads(data), value) + + def test_float(self): + for value in 3.141592, 7.62, 999.99: + for type_ in Type.FLOAT, Type.INDIRECT_FLOAT: + with self.subTest(value=value, type=type_): + data = encode_type(type_, value) + self.assertEqual(flexbuffers.Loads(data), value) + + data = encode_type(type_, value, 4) + self.assertAlmostEqual(flexbuffers.Loads(data), value, places=4) + + data = encode_type(type_, value, 8) + self.assertEqual(flexbuffers.Loads(data), value) + + def test_string(self): + for value in '', 'x', 'color', 'hello world': + with self.subTest(value=value): + data = encode_type(Type.STRING, value) + self.assertEqual(flexbuffers.Loads(data), value) + + def test_blob(self): + for value in bytes(), bytes([240, 12, 143, 7]), bytes(1000 * [17]): + with self.subTest(value=value): + data = encode_type(Type.BLOB, value) + self.assertEqual(flexbuffers.Loads(data), value) + + def test_key(self): + for value in '', 'color', 'hello world': + with self.subTest(value=value): + data = encode_type(Type.KEY, value) + self.assertEqual(flexbuffers.Loads(data), value) + + with self.assertRaises(ValueError): + encode_type(Type.KEY, (b'\x00' * 10).decode('ascii')) + + def test_vector(self): + + def encode_vector(elements, element_type): + fbb = flexbuffers.Builder() + with fbb.Vector(): + add = fbb.Adder(element_type) + for e in elements: + add(e) + return fbb.Finish() + + def encode_vector_from_elements(elements): + fbb = flexbuffers.Builder() + fbb.VectorFromElements(elements) + return fbb.Finish() + + for elements in [], [1435], [56, 23, 0, 6783]: + data = encode_vector(elements, Type.INT) + self.assertEqual(flexbuffers.Loads(data), elements) + + data = encode_vector_from_elements(elements) + self.assertEqual(flexbuffers.Loads(data), elements) + + # Elements of different type: one by one + elements = [56.0, 'flexbuffers', 0, False, 75123] + + fbb = flexbuffers.Builder() + with fbb.Vector(): + fbb.Float(elements[0]) + fbb.String(elements[1]) + fbb.UInt(elements[2], 8) + fbb.Bool(elements[3]) + fbb.Int(elements[4]) + data = fbb.Finish() + self.assertEqual(flexbuffers.Loads(data), elements) + + # Elements of different type: all at once + fbb = flexbuffers.Builder() + fbb.VectorFromElements(elements) + data = fbb.Finish() + self.assertEqual(flexbuffers.Loads(data), elements) + + def test_nested_vectors(self): + fbb = flexbuffers.Builder() + with fbb.Vector(): + fbb.String('begin') + fbb.IndirectInt(42) + with fbb.Vector(): + for i in range(5): + fbb.Int(i) + fbb.String('end') + data = fbb.Finish() + + self.assertEqual( + flexbuffers.Loads(data), ['begin', 42, [0, 1, 2, 3, 4], 'end']) + + def test_big_vector(self): + n = 10 * 1000 + fbb = flexbuffers.Builder() + with fbb.Vector(): + for i in range(n): + fbb.Int(i) + self.assertEqual(flexbuffers.Loads(fbb.Finish()), list(range(n))) + + def test_typed_vector(self): + + def encode_typed_vector_from_elements(elements, element_type=None): + fbb = flexbuffers.Builder() + fbb.TypedVectorFromElements(elements, element_type) + return fbb.Finish() + + for elements in [], [False], [True], [False, True, True, False, False]: + data = encode_typed_vector_from_elements(elements, Type.BOOL) + self.assertEqual(flexbuffers.Loads(data), elements) + + data = encode_typed_vector_from_elements(elements) + self.assertEqual(flexbuffers.Loads(data), elements) + + for elements in [], [23455], [351, -2, 0, 6783, 0, -10]: + data = encode_typed_vector_from_elements(elements, Type.INT) + self.assertEqual(flexbuffers.Loads(data), elements) + + data = encode_typed_vector_from_elements(elements) + self.assertEqual(flexbuffers.Loads(data), elements) + + for elements in [], [23455], [351, 2, 0, 6783, 0, 10]: + data = encode_typed_vector_from_elements(elements) + self.assertEqual(flexbuffers.Loads(data), elements) + + data = encode_typed_vector_from_elements(elements, Type.INT) + self.assertEqual(flexbuffers.Loads(data), elements) + + data = encode_typed_vector_from_elements(elements, Type.UINT) + self.assertEqual(flexbuffers.Loads(data), elements) + + for elements in [], [7.0], [52.0, 51.2, 70.0, -4.0]: + data = encode_typed_vector_from_elements(elements, Type.FLOAT) + self.assertEqual(flexbuffers.Loads(data), elements) + + data = encode_typed_vector_from_elements(elements) + self.assertEqual(flexbuffers.Loads(data), elements) + + for elements in [], ['color'], ['x', 'y']: + data = encode_typed_vector_from_elements(elements, Type.KEY) + self.assertEqual(flexbuffers.Loads(data), elements) + + data = encode_typed_vector_from_elements(elements) + self.assertEqual(flexbuffers.Loads(data), elements) + + def test_typed_vector_from_array(self): + + def encode_array(typecode, values): + fbb = flexbuffers.Builder() + fbb.VectorFromElements(array.array(typecode, values)) + return fbb.Finish() + + values = [1.0, 3.14, -2.54, 0.0] + data = encode_array('f', values) + for a, b in zip(flexbuffers.Loads(data), values): + self.assertAlmostEqual(a, b, places=2) + + values = [1.0, 3.14, -2.54, 0.0] + data = encode_array('d', values) + self.assertEqual(flexbuffers.Loads(data), values) + + values = [1, -7, 9, 26, 12] + data = encode_array('i', values) + self.assertEqual(flexbuffers.Loads(data), values) + + values = [0, 1, 2, 3, 4, 5, 6] + data = encode_array('I', values) + self.assertEqual(flexbuffers.Loads(data), values) + + def test_fixed_typed_vector(self): + + def encode_fixed_typed_vector(elements, element_type=None): + fbb = flexbuffers.Builder() + fbb.FixedTypedVectorFromElements(elements, element_type) + return fbb.Finish() + + for elements in ((-2, 2), (1, 2, 3), (100, -100, 200, -200), (4.0, 7.0), + (0.0, 1.0, 8.0), (9.0, 7.0, 1.0, 5.5)): + with self.subTest(elements=elements): + data = encode_fixed_typed_vector(elements) + self.assertSequenceEqual(flexbuffers.Loads(data), elements) + + elements = [-170, 432, 0, -7] + data = encode_fixed_typed_vector(elements, Type.INT) + self.assertSequenceEqual(flexbuffers.Loads(data), elements) + + with self.assertRaises(ValueError): + encode_fixed_typed_vector([]) # Invalid input length + + with self.assertRaises(ValueError): + encode_fixed_typed_vector([1]) # Invalid input length + + with self.assertRaises(ValueError): + encode_fixed_typed_vector([1, 2, 3, 4, 5]) # Invalid input length + + with self.assertRaises(TypeError): + encode_fixed_typed_vector([1, 1.0]) # Invalid input types + + with self.assertRaises(TypeError): + encode_fixed_typed_vector(['', '']) # Invalid input types + + def test_map_builder(self): + + def get_keys(data): + return [key.AsKey for key in flexbuffers.GetRoot(data).AsMap.Keys] + + # Empty map + fbb = flexbuffers.Builder() + with fbb.Map(): + pass + data = fbb.Finish() + + self.assertEqual(flexbuffers.Loads(data), {}) + + # Two-element map of Int + fbb = flexbuffers.Builder() + with fbb.Map(): + fbb.Int('y', -2) + fbb.Int('x', 10) + data = fbb.Finish() + + self.assertEqual(flexbuffers.Loads(data), {'x': 10, 'y': -2}) + + # Multiple-element map of vectors + fbb = flexbuffers.Builder() + with fbb.Map(): + with fbb.Vector('v'): + fbb.Int(45) + with fbb.TypedVector('tv'): + fbb.Int(-7) + fbb.FixedTypedVectorFromElements('ftv', [-2.0, 1.0]) + data = fbb.Finish() + + self.assertEqual( + flexbuffers.Loads(data), { + 'v': [45], + 'tv': [-7], + 'ftv': [-2.0, 1.0] + }) + + keys = get_keys(data) + self.assertEqual(sorted(keys), keys) + + # Multiple-element map of different types + fbb = flexbuffers.Builder() + with fbb.Map(): + fbb.Null('n') + fbb.Bool('b', False) + fbb.Int('i', -27) + fbb.UInt('u', 27) + fbb.Float('f', -0.85) + fbb.String('s', 'String') + fbb.Blob('bb', b'data') + fbb.IndirectInt('ii', -9500) + fbb.IndirectUInt('iu', 540) + fbb.IndirectFloat('if', 0.0) + fbb.VectorFromElements('v', [2, 1, 0.0]) + fbb.TypedVectorFromElements('tv', [2, 1, 0]) + fbb.FixedTypedVectorFromElements('ftv', [2.0, -6.0]) + data = fbb.Finish() + + self.assertEqual( + flexbuffers.Loads(data), { + 'n': None, + 'b': False, + 'i': -27, + 'u': 27, + 'f': -0.85, + 's': 'String', + 'bb': b'data', + 'ii': -9500, + 'iu': 540, + 'if': 0.0, + 'v': [2, 1, 0.0], + 'tv': [2, 1, 0], + 'ftv': [2.0, -6.0] + }) + + keys = get_keys(data) + self.assertEqual(sorted(keys), keys) + + def test_map_python(self): + maps = [ + {}, + { + 'key': 'value' + }, + { + 'x': None, + 'y': 3400, + 'z': -7040 + }, + { + 'zzz': 100, + 'aaa': 5.0, + 'ccc': ['Test', 32, False, None, True] + }, + { + 'name': ['John', 'Smith'], + 'valid': True, + 'note': None, + 'address': { + 'lines': [175, 'Alhambra'], + 'city': 'San Francisco', + 'zip': 94123, + }, + }, + ] + + for m in maps: + self.assertEqual(flexbuffers.Loads(flexbuffers.Dumps(m)), m) + + def test_gold_from_file(self): + data = read_test_file(GOLD_FLEXBUFFER_FILE) + self.assertEqual(flexbuffers.Loads(data), GOLD_FLEXBUFFER_OBJ) + + def test_gold_from_builder(self): + fbb = flexbuffers.Builder() + with fbb.Map(): + with fbb.Vector('vec'): + fbb.Int(-100) + fbb.String('Fred') + fbb.IndirectFloat(4.0) + i_f = fbb.LastValue + fbb.Blob(bytes([77])) + fbb.Bool(False) + fbb.ReuseValue(i_f) + + vec = [1, 2, 3] + fbb.VectorFromElements('bar', vec) + fbb.FixedTypedVectorFromElements('bar3', [1, 2, 3]) + fbb.VectorFromElements('bools', [True, False, True, False]) + fbb.Bool('bool', True) + fbb.Float('foo', 100) + with fbb.Map('mymap'): + fbb.String('foo', 'Fred') + data = fbb.Finish() + + self.assertEqual(flexbuffers.Loads(data), GOLD_FLEXBUFFER_OBJ) + + def test_min_bit_width(self): + fbb = flexbuffers.Builder(force_min_bit_width=flexbuffers.BitWidth.W8) + fbb.TypedVectorFromElements([0, 1, 0, 1, 0]) + data = fbb.Finish() + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsTypedVector) + self.assertEqual(root.AsTypedVector.ByteWidth, 1) + + fbb = flexbuffers.Builder(force_min_bit_width=flexbuffers.BitWidth.W32) + fbb.TypedVectorFromElements([0, 1, 0, 1, 0]) + data = fbb.Finish() + + root = flexbuffers.GetRoot(data) + self.assertTrue(root.IsTypedVector) + self.assertEqual(root.AsTypedVector.ByteWidth, 4) + + def test_share_keys(self): + + def encode_key_vector(value, count, share_keys): + fbb = flexbuffers.Builder(share_keys=share_keys) + with fbb.Vector(): + for _ in range(count): + fbb.Key(value) + return fbb.Finish(), fbb.KeyPool.Elements + + data, pool = encode_key_vector('test', 10, share_keys=False) + self.assertEqual(len(pool), 0) + self.assertEqual(len(data), 74) + self.assertEqual(flexbuffers.Loads(data), 10 * ['test']) + + data, pool = encode_key_vector('test', 10, share_keys=True) + self.assertEqual(len(pool), 1) + self.assertEqual(pool[0], 'test'.encode('ascii')) + self.assertEqual(len(data), 29) + self.assertEqual(flexbuffers.Loads(data), 10 * ['test']) + + def test_share_strings(self): + + def encode_string_vector(value, count, share_strings): + fbb = flexbuffers.Builder(share_strings=share_strings) + with fbb.Vector(): + for _ in range(count): + fbb.String(value) + return fbb.Finish(), fbb.StringPool.Elements + + data, pool = encode_string_vector('test', 10, share_strings=False) + self.assertEqual(len(pool), 0) + self.assertEqual(len(data), 84) + self.assertEqual(flexbuffers.Loads(data), 10 * ['test']) + + data, pool = encode_string_vector('test', 10, share_strings=True) + self.assertEqual(len(pool), 1) + self.assertEqual(pool[0], 'test'.encode('utf-8')) + self.assertEqual(len(data), 30) + self.assertEqual(flexbuffers.Loads(data), 10 * ['test']) + + def test_invalid_stack_size(self): + fbb = flexbuffers.Builder() + + with self.assertRaises(RuntimeError): + fbb.Finish() + + fbb.Int(100) + fbb.Int(200) + with self.assertRaises(RuntimeError): + fbb.Finish() + + fbb.Clear() + fbb.Int(420) + fbb.Finish() + + +if __name__ == '__main__': + unittest.main()