diff options
author | Dmitriy Kovalev <dkovalev@google.com> | 2020-05-07 14:26:33 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-07 14:26:33 -0700 |
commit | de89bd193370c8b33686f1f33edd63593e48cd3f (patch) | |
tree | 6aa825db27ce7142b5ce0af28ca12ffedd59f0f0 /python | |
parent | 8be05f6bd474a9942d41d98ad197d9813af1da1a (diff) | |
download | flatbuffers-de89bd193370c8b33686f1f33edd63593e48cd3f.tar.gz flatbuffers-de89bd193370c8b33686f1f33edd63593e48cd3f.tar.bz2 flatbuffers-de89bd193370c8b33686f1f33edd63593e48cd3f.zip |
Implement flexbuffers in python (#5880)
Diffstat (limited to 'python')
-rw-r--r-- | python/flatbuffers/flexbuffers.py | 1527 |
1 files changed, 1527 insertions, 0 deletions
diff --git a/python/flatbuffers/flexbuffers.py b/python/flatbuffers/flexbuffers.py new file mode 100644 index 00000000..da10668a --- /dev/null +++ b/python/flatbuffers/flexbuffers.py @@ -0,0 +1,1527 @@ +# Lint as: python3 +# Copyright 2020 Google Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Implementation of FlexBuffers binary format. + +For more info check https://google.github.io/flatbuffers/flexbuffers.html and +corresponding C++ implementation at +https://github.com/google/flatbuffers/blob/master/include/flatbuffers/flexbuffers.h +""" + +# pylint: disable=invalid-name +# TODO(dkovalev): Add type hints everywhere, so tools like pytypes could work. + +import array +import contextlib +import enum +import struct + +__all__ = ('Type', 'Builder', 'GetRoot', 'Dumps', 'Loads') + + +class BitWidth(enum.IntEnum): + """Supported bit widths of value types. + + These are used in the lower 2 bits of a type field to determine the size of + the elements (and or size field) of the item pointed to (e.g. vector). + """ + W8 = 0 # 2^0 = 1 byte + W16 = 1 # 2^1 = 2 bytes + W32 = 2 # 2^2 = 4 bytes + W64 = 3 # 2^3 = 8 bytes + + @staticmethod + def U(value): + """Returns the minimum `BitWidth` to encode unsigned integer value.""" + assert value >= 0 + + if value < (1 << 8): + return BitWidth.W8 + elif value < (1 << 16): + return BitWidth.W16 + elif value < (1 << 32): + return BitWidth.W32 + elif value < (1 << 64): + return BitWidth.W64 + else: + raise ValueError('value is too big to encode: %s' % value) + + @staticmethod + def I(value): + """Returns the minimum `BitWidth` to encode signed integer value.""" + # -2^(n-1) <= value < 2^(n-1) + # -2^n <= 2 * value < 2^n + # 2 * value < 2^n, when value >= 0 or 2 * (-value) <= 2^n, when value < 0 + # 2 * value < 2^n, when value >= 0 or 2 * (-value) - 1 < 2^n, when value < 0 + # + # if value >= 0: + # return BitWidth.U(2 * value) + # else: + # return BitWidth.U(2 * (-value) - 1) # ~x = -x - 1 + value *= 2 + return BitWidth.U(value if value >= 0 else ~value) + + @staticmethod + def F(value): + """Returns the `BitWidth` to encode floating point value.""" + if struct.unpack('f', struct.pack('f', value))[0] == value: + return BitWidth.W32 + return BitWidth.W64 + + @staticmethod + def B(byte_width): + return { + 1: BitWidth.W8, + 2: BitWidth.W16, + 4: BitWidth.W32, + 8: BitWidth.W64 + }[byte_width] + + +I = {1: 'b', 2: 'h', 4: 'i', 8: 'q'} # Integer formats +U = {1: 'B', 2: 'H', 4: 'I', 8: 'Q'} # Unsigned integer formats +F = {4: 'f', 8: 'd'} # Floating point formats + + +def _Unpack(fmt, buf): + return struct.unpack(fmt[len(buf)], buf)[0] + + +def _UnpackVector(fmt, buf, length): + byte_width = len(buf) // length + return struct.unpack('%d%s' % (length, fmt[byte_width]), buf) + + +def _Pack(fmt, value, byte_width): + return struct.pack(fmt[byte_width], value) + + +def _PackVector(fmt, values, byte_width): + return struct.pack('%d%s' % (len(values), fmt[byte_width]), *values) + + +def _Mutate(fmt, buf, value, byte_width, value_bit_width): + if (1 << value_bit_width) <= byte_width: + buf[:byte_width] = _Pack(fmt, value, byte_width) + return True + return False + + +# Computes how many bytes you'd have to pad to be able to write an +# "scalar_size" scalar if the buffer had grown to "buf_size", +# "scalar_size" is a power of two. +def _PaddingBytes(buf_size, scalar_size): + # ((buf_size + (scalar_size - 1)) // scalar_size) * scalar_size - buf_size + return -buf_size & (scalar_size - 1) + + +def _ShiftSlice(s, offset, length): + start = offset + (0 if s.start is None else s.start) + stop = offset + (length if s.stop is None else s.stop) + return slice(start, stop, s.step) + + +# https://en.cppreference.com/w/cpp/algorithm/lower_bound +def _LowerBound(values, value, pred): + """Implementation of C++ std::lower_bound() algorithm.""" + first, last = 0, len(values) + count = last - first + while count > 0: + i = first + step = count // 2 + i += step + if pred(values[i], value): + i += 1 + first = i + count -= step + 1 + else: + count = step + return first + + +# https://en.cppreference.com/w/cpp/algorithm/binary_search +def _BinarySearch(values, value, pred=lambda x, y: x < y): + """Implementation of C++ std::binary_search() algorithm.""" + index = _LowerBound(values, value, pred) + if index != len(values) and not pred(value, values[index]): + return index + return -1 + + +class Type(enum.IntEnum): + """Supported types of encoded data. + + These are used as the upper 6 bits of a type field to indicate the actual + type. + """ + NULL = 0 + INT = 1 + UINT = 2 + FLOAT = 3 + # Types above stored inline, types below store an offset. + KEY = 4 + STRING = 5 + INDIRECT_INT = 6 + INDIRECT_UINT = 7 + INDIRECT_FLOAT = 8 + MAP = 9 + VECTOR = 10 # Untyped. + + VECTOR_INT = 11 # Typed any size (stores no type table). + VECTOR_UINT = 12 + VECTOR_FLOAT = 13 + VECTOR_KEY = 14 + # DEPRECATED, use VECTOR or VECTOR_KEY instead. + # Read test.cpp/FlexBuffersDeprecatedTest() for details on why. + VECTOR_STRING_DEPRECATED = 15 + + VECTOR_INT2 = 16 # Typed tuple (no type table, no size field). + VECTOR_UINT2 = 17 + VECTOR_FLOAT2 = 18 + VECTOR_INT3 = 19 # Typed triple (no type table, no size field). + VECTOR_UINT3 = 20 + VECTOR_FLOAT3 = 21 + VECTOR_INT4 = 22 # Typed quad (no type table, no size field). + VECTOR_UINT4 = 23 + VECTOR_FLOAT4 = 24 + + BLOB = 25 + BOOL = 26 + VECTOR_BOOL = 36 # To do the same type of conversion of type to vector type + + @staticmethod + def Pack(type_, bit_width): + return (int(type_) << 2) | bit_width + + @staticmethod + def Unpack(packed_type): + return 1 << (packed_type & 0b11), Type(packed_type >> 2) + + @staticmethod + def IsInline(type_): + return type_ <= Type.FLOAT or type_ == Type.BOOL + + @staticmethod + def IsTypedVector(type_): + return Type.VECTOR_INT <= type_ <= Type.VECTOR_STRING_DEPRECATED or \ + type_ == Type.VECTOR_BOOL + + @staticmethod + def IsTypedVectorElementType(type_): + return Type.INT <= type_ <= Type.STRING or type_ == Type.BOOL + + @staticmethod + def ToTypedVectorElementType(type_): + if not Type.IsTypedVector(type_): + raise ValueError('must be typed vector type') + + return Type(type_ - Type.VECTOR_INT + Type.INT) + + @staticmethod + def IsFixedTypedVector(type_): + return Type.VECTOR_INT2 <= type_ <= Type.VECTOR_FLOAT4 + + @staticmethod + def IsFixedTypedVectorElementType(type_): + return Type.INT <= type_ <= Type.FLOAT + + @staticmethod + def ToFixedTypedVectorElementType(type_): + if not Type.IsFixedTypedVector(type_): + raise ValueError('must be fixed typed vector type') + + # 3 types each, starting from length 2. + fixed_type = type_ - Type.VECTOR_INT2 + return Type(fixed_type % 3 + Type.INT), fixed_type // 3 + 2 + + @staticmethod + def ToTypedVector(element_type, fixed_len=0): + """Converts element type to corresponding vector type. + + Args: + element_type: vector element type + fixed_len: number of elements: 0 for typed vector; 2, 3, or 4 for fixed + typed vector. + + Returns: + Typed vector type or fixed typed vector type. + """ + if fixed_len == 0: + if not Type.IsTypedVectorElementType(element_type): + raise ValueError('must be typed vector element type') + else: + if not Type.IsFixedTypedVectorElementType(element_type): + raise ValueError('must be fixed typed vector element type') + + offset = element_type - Type.INT + if fixed_len == 0: + return Type(offset + Type.VECTOR_INT) # TypedVector + elif fixed_len == 2: + return Type(offset + Type.VECTOR_INT2) # FixedTypedVector + elif fixed_len == 3: + return Type(offset + Type.VECTOR_INT3) # FixedTypedVector + elif fixed_len == 4: + return Type(offset + Type.VECTOR_INT4) # FixedTypedVector + else: + raise ValueError('unsupported fixed_len: %s' % fixed_len) + + +class Buf: + """Class to access underlying buffer object starting from the given offset.""" + + def __init__(self, buf, offset): + self._buf = buf + self._offset = offset if offset >= 0 else len(buf) + offset + self._length = len(buf) - self._offset + + def __getitem__(self, key): + if isinstance(key, slice): + return self._buf[_ShiftSlice(key, self._offset, self._length)] + elif isinstance(key, int): + return self._buf[self._offset + key] + else: + raise TypeError('invalid key type') + + def __setitem__(self, key, value): + if isinstance(key, slice): + self._buf[_ShiftSlice(key, self._offset, self._length)] = value + elif isinstance(key, int): + self._buf[self._offset + key] = key + else: + raise TypeError('invalid key type') + + def __repr__(self): + return 'buf[%d:]' % self._offset + + def Find(self, sub): + """Returns the lowest index where the sub subsequence is found.""" + return self._buf[self._offset:].find(sub) + + def Slice(self, offset): + """Returns new `Buf` which starts from the given offset.""" + return Buf(self._buf, self._offset + offset) + + def Indirect(self, offset, byte_width): + """Return new `Buf` based on the encoded offset (indirect encoding).""" + return self.Slice(offset - _Unpack(U, self[offset:offset + byte_width])) + + +class Object: + """Base class for all non-trivial data accessors.""" + __slots__ = '_buf', '_byte_width' + + def __init__(self, buf, byte_width): + self._buf = buf + self._byte_width = byte_width + + @property + def ByteWidth(self): + return self._byte_width + + +class Sized(Object): + """Base class for all data accessors which need to read encoded size.""" + __slots__ = '_size', + + def __init__(self, buf, byte_width, size=0): + super().__init__(buf, byte_width) + if size == 0: + self._size = _Unpack(U, self.SizeBytes) + else: + self._size = size + + @property + def SizeBytes(self): + return self._buf[-self._byte_width:0] + + def __len__(self): + return self._size + + +class Blob(Sized): + """Data accessor for the encoded blob bytes.""" + __slots__ = () + + @property + def Bytes(self): + return self._buf[0:len(self)] + + def __repr__(self): + return 'Blob(%s, size=%d)' % (self._buf, len(self)) + + +class String(Sized): + """Data accessor for the encoded string bytes.""" + __slots__ = () + + @property + def Bytes(self): + return self._buf[0:len(self)] + + def Mutate(self, value): + """Mutates underlying string bytes in place. + + Args: + value: New string to replace the existing one. New string must have less + or equal UTF-8-encoded bytes than the existing one to successfully + mutate underlying byte buffer. + + Returns: + Whether the value was mutated or not. + """ + encoded = value.encode('utf-8') + n = len(encoded) + if n <= len(self): + self._buf[-self._byte_width:0] = _Pack(U, n, self._byte_width) + self._buf[0:n] = encoded + self._buf[n:len(self)] = bytearray(len(self) - n) + return True + return False + + def __str__(self): + return self.Bytes.decode('utf-8') + + def __repr__(self): + return 'String(%s, size=%d)' % (self._buf, len(self)) + + +class Key(Object): + """Data accessor for the encoded key bytes.""" + __slots__ = () + + def __init__(self, buf, byte_width): + assert byte_width == 1 + super().__init__(buf, byte_width) + + @property + def Bytes(self): + return self._buf[0:len(self)] + + def __len__(self): + return self._buf.Find(0) + + def __str__(self): + return self.Bytes.decode('ascii') + + def __repr__(self): + return 'Key(%s, size=%d)' % (self._buf, len(self)) + + +class Vector(Sized): + """Data accessor for the encoded vector bytes.""" + __slots__ = () + + def __getitem__(self, index): + if index < 0 or index >= len(self): + raise IndexError('vector index %s is out of [0, %d) range' % \ + (index, len(self))) + + packed_type = self._buf[len(self) * self._byte_width + index] + buf = self._buf.Slice(index * self._byte_width) + return Ref.PackedType(buf, self._byte_width, packed_type) + + @property + def Value(self): + """Returns the underlying encoded data as a list object.""" + return [e.Value for e in self] + + def __repr__(self): + return 'Vector(%s, byte_width=%d, size=%d)' % \ + (self._buf, self._byte_width, self._size) + + +class TypedVector(Sized): + """Data accessor for the encoded typed vector or fixed typed vector bytes.""" + __slots__ = '_element_type', '_size' + + def __init__(self, buf, byte_width, element_type, size=0): + super().__init__(buf, byte_width, size) + + if element_type == Type.STRING: + # These can't be accessed as strings, since we don't know the bit-width + # of the size field, see the declaration of + # FBT_VECTOR_STRING_DEPRECATED above for details. + # We change the type here to be keys, which are a subtype of strings, + # and will ignore the size field. This will truncate strings with + # embedded nulls. + element_type = Type.KEY + + self._element_type = element_type + + @property + def Bytes(self): + return self._buf[:self._byte_width * len(self)] + + @property + def ElementType(self): + return self._element_type + + def __getitem__(self, index): + if index < 0 or index >= len(self): + raise IndexError('vector index %s is out of [0, %d) range' % \ + (index, len(self))) + + buf = self._buf.Slice(index * self._byte_width) + return Ref(buf, self._byte_width, 1, self._element_type) + + @property + def Value(self): + """Returns underlying data as list object.""" + if not self: + return [] + + if self._element_type is Type.BOOL: + return [bool(e) for e in _UnpackVector(U, self.Bytes, len(self))] + elif self._element_type is Type.INT: + return list(_UnpackVector(I, self.Bytes, len(self))) + elif self._element_type is Type.UINT: + return list(_UnpackVector(U, self.Bytes, len(self))) + elif self._element_type is Type.FLOAT: + return list(_UnpackVector(F, self.Bytes, len(self))) + elif self._element_type is Type.KEY: + return [e.AsKey for e in self] + elif self._element_type is Type.STRING: + return [e.AsString for e in self] + else: + raise TypeError('unsupported element_type: %s' % self._element_type) + + def __repr__(self): + return 'TypedVector(%s, byte_width=%d, element_type=%s, size=%d)' % \ + (self._buf, self._byte_width, self._element_type, self._size) + + +class Map(Vector): + """Data accessor for the encoded map bytes.""" + + @staticmethod + def CompareKeys(a, b): + if isinstance(a, Ref): + a = a.AsKeyBytes + if isinstance(b, Ref): + b = b.AsKeyBytes + return a < b + + def __getitem__(self, key): + if isinstance(key, int): + return super().__getitem__(key) + + index = _BinarySearch(self.Keys, key.encode('ascii'), self.CompareKeys) + if index != -1: + return super().__getitem__(index) + + raise KeyError(key) + + @property + def Keys(self): + byte_width = _Unpack(U, self._buf[-2 * self._byte_width:-self._byte_width]) + buf = self._buf.Indirect(-3 * self._byte_width, self._byte_width) + return TypedVector(buf, byte_width, Type.KEY) + + @property + def Values(self): + return Vector(self._buf, self._byte_width) + + @property + def Value(self): + return {k.Value: v.Value for k, v in zip(self.Keys, self.Values)} + + def __repr__(self): + return 'Map(%s, size=%d)' % (self._buf, len(self)) + + +class Ref: + """Data accessor for the encoded data bytes.""" + __slots__ = '_buf', '_parent_width', '_byte_width', '_type' + + @staticmethod + def PackedType(buf, parent_width, packed_type): + byte_width, type_ = Type.Unpack(packed_type) + return Ref(buf, parent_width, byte_width, type_) + + def __init__(self, buf, parent_width, byte_width, type_): + self._buf = buf + self._parent_width = parent_width + self._byte_width = byte_width + self._type = type_ + + def __repr__(self): + return 'Ref(%s, parent_width=%d, byte_width=%d, type_=%s)' % \ + (self._buf, self._parent_width, self._byte_width, self._type) + + @property + def _Bytes(self): + return self._buf[:self._parent_width] + + def _ConvertError(self, target_type): + raise TypeError('cannot convert %s to %s' % (self._type, target_type)) + + def _Indirect(self): + return self._buf.Indirect(0, self._parent_width) + + @property + def IsNull(self): + return self._type is Type.NULL + + @property + def IsBool(self): + return self._type is Type.BOOL + + @property + def AsBool(self): + if self._type is Type.BOOL: + return bool(_Unpack(U, self._Bytes)) + else: + return self.AsInt != 0 + + def MutateBool(self, value): + """Mutates underlying boolean value bytes in place. + + Args: + value: New boolean value. + + Returns: + Whether the value was mutated or not. + """ + return self.IsBool and \ + _Mutate(U, self._buf, value, self._parent_width, BitWidth.W8) + + @property + def IsNumeric(self): + return self.IsInt or self.IsFloat + + @property + def IsInt(self): + return self._type in (Type.INT, Type.INDIRECT_INT, Type.UINT, + Type.INDIRECT_UINT) + + @property + def AsInt(self): + """Returns current reference as integer value.""" + if self.IsNull: + return 0 + elif self.IsBool: + return int(self.AsBool) + elif self._type is Type.INT: + return _Unpack(I, self._Bytes) + elif self._type is Type.INDIRECT_INT: + return _Unpack(I, self._Indirect()[:self._byte_width]) + if self._type is Type.UINT: + return _Unpack(U, self._Bytes) + elif self._type is Type.INDIRECT_UINT: + return _Unpack(U, self._Indirect()[:self._byte_width]) + elif self.IsString: + return len(self.AsString) + elif self.IsKey: + return len(self.AsKey) + elif self.IsBlob: + return len(self.AsBlob) + elif self.IsVector: + return len(self.AsVector) + elif self.IsTypedVector: + return len(self.AsTypedVector) + elif self.IsFixedTypedVector: + return len(self.AsFixedTypedVector) + else: + raise self._ConvertError(Type.INT) + + def MutateInt(self, value): + """Mutates underlying integer value bytes in place. + + Args: + value: New integer value. It must fit to the byte size of the existing + encoded value. + + Returns: + Whether the value was mutated or not. + """ + if self._type is Type.INT: + return _Mutate(I, self._buf, value, self._parent_width, BitWidth.I(value)) + elif self._type is Type.INDIRECT_INT: + return _Mutate(I, self._Indirect(), value, self._byte_width, + BitWidth.I(value)) + elif self._type is Type.UINT: + return _Mutate(U, self._buf, value, self._parent_width, BitWidth.U(value)) + elif self._type is Type.INDIRECT_UINT: + return _Mutate(U, self._Indirect(), value, self._byte_width, + BitWidth.U(value)) + else: + return False + + @property + def IsFloat(self): + return self._type in (Type.FLOAT, Type.INDIRECT_FLOAT) + + @property + def AsFloat(self): + """Returns current reference as floating point value.""" + if self.IsNull: + return 0.0 + elif self.IsBool: + return float(self.AsBool) + elif self.IsInt: + return float(self.AsInt) + elif self._type is Type.FLOAT: + return _Unpack(F, self._Bytes) + elif self._type is Type.INDIRECT_FLOAT: + return _Unpack(F, self._Indirect()[:self._byte_width]) + elif self.IsString: + return float(self.AsString) + elif self.IsVector: + return float(len(self.AsVector)) + elif self.IsTypedVector(): + return float(len(self.AsTypedVector)) + elif self.IsFixedTypedVector(): + return float(len(self.FixedTypedVector)) + else: + raise self._ConvertError(Type.FLOAT) + + def MutateFloat(self, value): + """Mutates underlying floating point value bytes in place. + + Args: + value: New float value. It must fit to the byte size of the existing + encoded value. + + Returns: + Whether the value was mutated or not. + """ + if self._type is Type.FLOAT: + return _Mutate(F, self._buf, value, self._parent_width, + BitWidth.B(self._parent_width)) + elif self._type is Type.INDIRECT_FLOAT: + return _Mutate(F, self._Indirect(), value, self._byte_width, + BitWidth.B(self._byte_width)) + else: + return False + + @property + def IsKey(self): + return self._type is Type.KEY + + @property + def AsKeyBytes(self): + if self.IsKey: + return Key(self._Indirect(), self._byte_width).Bytes + else: + raise self._ConvertError(Type.KEY) + + @property + def AsKey(self): + if self.IsKey: + return str(Key(self._Indirect(), self._byte_width)) + else: + raise self._ConvertError(Type.KEY) + + @property + def IsString(self): + return self._type is Type.STRING + + @property + def AsString(self): + if self.IsString: + return str(String(self._Indirect(), self._byte_width)) + elif self.IsKey: + return self.AsKey + else: + raise self._ConvertError(Type.STRING) + + def MutateString(self, value): + return String(self._Indirect(), self._byte_width).Mutate(value) + + @property + def IsBlob(self): + return self._type is Type.BLOB + + @property + def AsBlob(self): + if self.IsBlob: + return Blob(self._Indirect(), self._byte_width).Bytes + else: + raise self._ConvertError(Type.BLOB) + + @property + def IsAnyVector(self): + return self.IsVector or self.IsTypedVector or self.IsFixedTypedVector() + + @property + def IsVector(self): + return self._type in (Type.VECTOR, Type.MAP) + + @property + def AsVector(self): + if self.IsVector: + return Vector(self._Indirect(), self._byte_width) + else: + raise self._ConvertError(Type.VECTOR) + + @property + def IsTypedVector(self): + return Type.IsTypedVector(self._type) + + @property + def AsTypedVector(self): + if self.IsTypedVector: + return TypedVector(self._Indirect(), self._byte_width, + Type.ToTypedVectorElementType(self._type)) + else: + raise self._ConvertError('TYPED_VECTOR') + + @property + def IsFixedTypedVector(self): + return Type.IsFixedTypedVector(self._type) + + @property + def AsFixedTypedVector(self): + if self.IsFixedTypedVector: + element_type, size = Type.ToFixedTypedVectorElementType(self._type) + return TypedVector(self._Indirect(), self._byte_width, element_type, size) + else: + raise self._ConvertError('FIXED_TYPED_VECTOR') + + @property + def IsMap(self): + return self._type is Type.MAP + + @property + def AsMap(self): + if self.IsMap: + return Map(self._Indirect(), self._byte_width) + else: + raise self._ConvertError(Type.MAP) + + @property + def Value(self): + """Converts current reference to value of corresponding type. + + This is equivalent to calling `AsInt` for integer values, `AsFloat` for + floating point values, etc. + + Returns: + Value of corresponding type. + """ + if self.IsNull: + return None + elif self.IsBool: + return self.AsBool + elif self.IsInt: + return self.AsInt + elif self.IsFloat: + return self.AsFloat + elif self.IsString: + return self.AsString + elif self.IsKey: + return self.AsKey + elif self.IsBlob: + return self.AsBlob + elif self.IsMap: + return self.AsMap.Value + elif self.IsVector: + return self.AsVector.Value + elif self.IsTypedVector: + return self.AsTypedVector.Value + elif self.IsFixedTypedVector: + return self.AsFixedTypedVector.Value + else: + raise TypeError('cannot convert %r to value' % self) + + +def _IsIterable(obj): + try: + iter(obj) + return True + except TypeError: + return False + + +class Value: + """Class to represent given value during the encoding process.""" + + @staticmethod + def Null(): + return Value(0, Type.NULL, BitWidth.W8) + + @staticmethod + def Bool(value): + return Value(value, Type.BOOL, BitWidth.W8) + + @staticmethod + def Int(value, bit_width): + return Value(value, Type.INT, bit_width) + + @staticmethod + def UInt(value, bit_width): + return Value(value, Type.UINT, bit_width) + + @staticmethod + def Float(value, bit_width): + return Value(value, Type.FLOAT, bit_width) + + @staticmethod + def Key(offset): + return Value(offset, Type.KEY, BitWidth.W8) + + def __init__(self, value, type_, min_bit_width): + self._value = value + self._type = type_ + + # For scalars: of itself, for vector: of its elements, for string: length. + self._min_bit_width = min_bit_width + + @property + def Value(self): + return self._value + + @property + def Type(self): + return self._type + + @property + def MinBitWidth(self): + return self._min_bit_width + + def StoredPackedType(self, parent_bit_width=BitWidth.W8): + return Type.Pack(self._type, self.StoredWidth(parent_bit_width)) + + # We have an absolute offset, but want to store a relative offset + # elem_index elements beyond the current buffer end. Since whether + # the relative offset fits in a certain byte_width depends on + # the size of the elements before it (and their alignment), we have + # to test for each size in turn. + def ElemWidth(self, buf_size, elem_index=0): + if Type.IsInline(self._type): + return self._min_bit_width + for byte_width in 1, 2, 4, 8: + offset_loc = buf_size + _PaddingBytes(buf_size, byte_width) + \ + elem_index * byte_width + bit_width = BitWidth.U(offset_loc - self._value) + if byte_width == (1 << bit_width): + return bit_width + raise ValueError('relative offset is too big') + + def StoredWidth(self, parent_bit_width=BitWidth.W8): + if Type.IsInline(self._type): + return max(self._min_bit_width, parent_bit_width) + return self._min_bit_width + + def __repr__(self): + return 'Value(%s, %s, %s)' % (self._value, self._type, self._min_bit_width) + + def __str__(self): + return str(self._value) + + +def InMap(func): + def wrapper(self, *args, **kwargs): + if isinstance(args[0], str): + self.Key(args[0]) + func(self, *args[1:], **kwargs) + else: + func(self, *args, **kwargs) + return wrapper + + +def InMapForString(func): + def wrapper(self, *args): + if len(args) == 1: + func(self, args[0]) + elif len(args) == 2: + self.Key(args[0]) + func(self, args[1]) + else: + raise ValueError('invalid number of arguments') + return wrapper + + +class Pool: + """Collection of (data, offset) pairs sorted by data for quick access.""" + + def __init__(self): + self._pool = [] # sorted list of (data, offset) tuples + + def FindOrInsert(self, data, offset): + do = data, offset + index = _BinarySearch(self._pool, do, lambda a, b: a[0] < b[0]) + if index != -1: + _, offset = self._pool[index] + return offset + self._pool.insert(index, do) + return None + + def Clear(self): + self._pool = [] + + @property + def Elements(self): + return [data for data, _ in self._pool] + + +class Builder: + """Helper class to encode structural data into flexbuffers format.""" + + def __init__(self, + share_strings=False, + share_keys=True, + force_min_bit_width=BitWidth.W8): + self._share_strings = share_strings + self._share_keys = share_keys + self._force_min_bit_width = force_min_bit_width + + self._string_pool = Pool() + self._key_pool = Pool() + + self._finished = False + self._buf = bytearray() + self._stack = [] + + def __len__(self): + return len(self._buf) + + @property + def StringPool(self): + return self._string_pool + + @property + def KeyPool(self): + return self._key_pool + + def Clear(self): + self._string_pool.Clear() + self._key_pool.Clear() + self._finished = False + self._buf = bytearray() + self._stack = [] + + def Finish(self): + """Finishes encoding process and returns underlying buffer.""" + if self._finished: + raise RuntimeError('builder has been already finished') + + # If you hit this exception, you likely have objects that were never + # included in a parent. You need to have exactly one root to finish a + # buffer. Check your Start/End calls are matched, and all objects are inside + # some other object. + if len(self._stack) != 1: + raise RuntimeError('internal stack size must be one') + + value = self._stack[0] + byte_width = self._Align(value.ElemWidth(len(self._buf))) + self._WriteAny(value, byte_width=byte_width) # Root value + self._Write(U, value.StoredPackedType(), byte_width=1) # Root type + self._Write(U, byte_width, byte_width=1) # Root size + + self.finished = True + return self._buf + + def _ReadKey(self, offset): + key = self._buf[offset:] + return key[:key.find(0)] + + def _Align(self, alignment): + byte_width = 1 << alignment + self._buf.extend(b'\x00' * _PaddingBytes(len(self._buf), byte_width)) + return byte_width + + def _Write(self, fmt, value, byte_width): + self._buf.extend(_Pack(fmt, value, byte_width)) + + def _WriteVector(self, fmt, values, byte_width): + self._buf.extend(_PackVector(fmt, values, byte_width)) + + def _WriteOffset(self, offset, byte_width): + relative_offset = len(self._buf) - offset + assert byte_width == 8 or relative_offset < (1 << (8 * byte_width)) + self._Write(U, relative_offset, byte_width) + + def _WriteAny(self, value, byte_width): + fmt = { + Type.NULL: U, Type.BOOL: U, Type.INT: I, Type.UINT: U, Type.FLOAT: F + }.get(value.Type) + if fmt: + self._Write(fmt, value.Value, byte_width) + else: + self._WriteOffset(value.Value, byte_width) + + def _WriteBlob(self, data, append_zero, type_): + bit_width = BitWidth.U(len(data)) + byte_width = self._Align(bit_width) + self._Write(U, len(data), byte_width) + loc = len(self._buf) + self._buf.extend(data) + if append_zero: + self._buf.append(0) + self._stack.append(Value(loc, type_, bit_width)) + return loc + + def _WriteScalarVector(self, element_type, byte_width, elements, fixed): + """Writes scalar vector elements to the underlying buffer.""" + bit_width = BitWidth.B(byte_width) + # If you get this exception, you're trying to write a vector with a size + # field that is bigger than the scalars you're trying to write (e.g. a + # byte vector > 255 elements). For such types, write a "blob" instead. + if BitWidth.U(len(elements)) > bit_width: + raise ValueError('too many elements for the given byte_width') + + self._Align(bit_width) + if not fixed: + self._Write(U, len(elements), byte_width) + + loc = len(self._buf) + + fmt = {Type.INT: I, Type.UINT: U, Type.FLOAT: F}.get(element_type) + if not fmt: + raise TypeError('unsupported element_type') + self._WriteVector(fmt, elements, byte_width) + + type_ = Type.ToTypedVector(element_type, len(elements) if fixed else 0) + self._stack.append(Value(loc, type_, bit_width)) + return loc + + def _CreateVector(self, elements, typed, fixed, keys=None): + """Writes vector elements to the underlying buffer.""" + length = len(elements) + + if fixed and not typed: + raise ValueError('fixed vector must be typed') + + # Figure out smallest bit width we can store this vector with. + bit_width = max(self._force_min_bit_width, BitWidth.U(length)) + prefix_elems = 1 # Vector size + if keys: + bit_width = max(bit_width, keys.ElemWidth(len(self._buf))) + prefix_elems += 2 # Offset to the keys vector and its byte width. + + vector_type = Type.KEY + # Check bit widths and types for all elements. + for i, e in enumerate(elements): + bit_width = max(bit_width, e.ElemWidth(len(self._buf), prefix_elems + i)) + + if typed: + if i == 0: + vector_type = e.Type + else: + if vector_type != e.Type: + raise RuntimeError('typed vector elements must be of the same type') + + if fixed and not Type.IsFixedTypedVectorElementType(vector_type): + raise RuntimeError('must be fixed typed vector element type') + + byte_width = self._Align(bit_width) + # Write vector. First the keys width/offset if available, and size. + if keys: + self._WriteOffset(keys.Value, byte_width) + self._Write(U, 1 << keys.MinBitWidth, byte_width) + + if not fixed: + self._Write(U, length, byte_width) + + # Then the actual data. + loc = len(self._buf) + for e in elements: + self._WriteAny(e, byte_width) + + # Then the types. + if not typed: + for e in elements: + self._buf.append(e.StoredPackedType(bit_width)) + + if keys: + type_ = Type.MAP + else: + if typed: + type_ = Type.ToTypedVector(vector_type, length if fixed else 0) + else: + type_ = Type.VECTOR + + return Value(loc, type_, bit_width) + + def _PushIndirect(self, value, type_, bit_width): + byte_width = self._Align(bit_width) + loc = len(self._buf) + fmt = { + Type.INDIRECT_INT: I, + Type.INDIRECT_UINT: U, + Type.INDIRECT_FLOAT: F + }[type_] + self._Write(fmt, value, byte_width) + self._stack.append(Value(loc, type_, bit_width)) + + @InMapForString + def String(self, value): + """Encodes string value.""" + reset_to = len(self._buf) + encoded = value.encode('utf-8') + loc = self._WriteBlob(encoded, append_zero=True, type_=Type.STRING) + if self._share_strings: + prev_loc = self._string_pool.FindOrInsert(encoded, loc) + if prev_loc is not None: + del self._buf[reset_to:] + self._stack[-1]._value = loc = prev_loc # pylint: disable=protected-access + + return loc + + @InMap + def Blob(self, value): + """Encodes binary blob value. + + Args: + value: A byte/bytearray value to encode + + Returns: + Offset of the encoded value in underlying the byte buffer. + """ + return self._WriteBlob(value, append_zero=False, type_=Type.BLOB) + + def Key(self, value): + """Encodes key value. + + Args: + value: A byte/bytearray/str value to encode. Byte object must not contain + zero bytes. String object must be convertible to ASCII. + + Returns: + Offset of the encoded value in the underlying byte buffer. + """ + if isinstance(value, (bytes, bytearray)): + encoded = value + else: + encoded = value.encode('ascii') + + if 0 in encoded: + raise ValueError('key contains zero byte') + + loc = len(self._buf) + self._buf.extend(encoded) + self._buf.append(0) + if self._share_keys: + prev_loc = self._key_pool.FindOrInsert(encoded, loc) + if prev_loc is not None: + del self._buf[loc:] + loc = prev_loc + + self._stack.append(Value.Key(loc)) + return loc + + def Null(self, key=None): + """Encodes None value.""" + if key: + self.Key(key) + self._stack.append(Value.Null()) + + @InMap + def Bool(self, value): + """Encodes boolean value. + + Args: + value: A boolean value. + """ + self._stack.append(Value.Bool(value)) + + @InMap + def Int(self, value, byte_width=0): + """Encodes signed integer value. + + Args: + value: A signed integer value. + byte_width: Number of bytes to use: 1, 2, 4, or 8. + """ + bit_width = BitWidth.I(value) if byte_width == 0 else BitWidth.B(byte_width) + self._stack.append(Value.Int(value, bit_width)) + + @InMap + def IndirectInt(self, value, byte_width=0): + """Encodes signed integer value indirectly. + + Args: + value: A signed integer value. + byte_width: Number of bytes to use: 1, 2, 4, or 8. + """ + bit_width = BitWidth.I(value) if byte_width == 0 else BitWidth.B(byte_width) + self._PushIndirect(value, Type.INDIRECT_INT, bit_width) + + @InMap + def UInt(self, value, byte_width=0): + """Encodes unsigned integer value. + + Args: + value: An unsigned integer value. + byte_width: Number of bytes to use: 1, 2, 4, or 8. + """ + bit_width = BitWidth.U(value) if byte_width == 0 else BitWidth.B(byte_width) + self._stack.append(Value.UInt(value, bit_width)) + + @InMap + def IndirectUInt(self, value, byte_width=0): + """Encodes unsigned integer value indirectly. + + Args: + value: An unsigned integer value. + byte_width: Number of bytes to use: 1, 2, 4, or 8. + """ + bit_width = BitWidth.U(value) if byte_width == 0 else BitWidth.B(byte_width) + self._PushIndirect(value, Type.INDIRECT_UINT, bit_width) + + @InMap + def Float(self, value, byte_width=0): + """Encodes floating point value. + + Args: + value: A floating point value. + byte_width: Number of bytes to use: 4 or 8. + """ + bit_width = BitWidth.F(value) if byte_width == 0 else BitWidth.B(byte_width) + self._stack.append(Value.Float(value, bit_width)) + + @InMap + def IndirectFloat(self, value, byte_width=0): + """Encodes floating point value indirectly. + + Args: + value: A floating point value. + byte_width: Number of bytes to use: 4 or 8. + """ + bit_width = BitWidth.F(value) if byte_width == 0 else BitWidth.B(byte_width) + self._PushIndirect(value, Type.INDIRECT_FLOAT, bit_width) + + def _StartVector(self): + """Starts vector construction.""" + return len(self._stack) + + def _EndVector(self, start, typed, fixed): + """Finishes vector construction by encodung its elements.""" + vec = self._CreateVector(self._stack[start:], typed, fixed) + del self._stack[start:] + self._stack.append(vec) + return vec.Value + + @contextlib.contextmanager + def Vector(self, key=None): + if key: + self.Key(key) + + try: + start = self._StartVector() + yield self + finally: + self._EndVector(start, typed=False, fixed=False) + + @InMap + def VectorFromElements(self, elements): + """Encodes sequence of any elements as a vector. + + Args: + elements: sequence of elements, they may have different types. + """ + with self.Vector(): + for e in elements: + self.Add(e) + + @contextlib.contextmanager + def TypedVector(self, key=None): + if key: + self.Key(key) + + try: + start = self._StartVector() + yield self + finally: + self._EndVector(start, typed=True, fixed=False) + + @InMap + def TypedVectorFromElements(self, elements, element_type=None): + """Encodes sequence of elements of the same type as typed vector. + + Args: + elements: Sequence of elements, they must be of the same type. + element_type: Suggested element type. Setting it to None means determining + correct value automatically based on the given elements. + """ + if isinstance(elements, array.array): + if elements.typecode == 'f': + self._WriteScalarVector(Type.FLOAT, 4, elements, fixed=False) + elif elements.typecode == 'd': + self._WriteScalarVector(Type.FLOAT, 8, elements, fixed=False) + elif elements.typecode in ('b', 'h', 'i', 'l', 'q'): + self._WriteScalarVector( + Type.INT, elements.itemsize, elements, fixed=False) + elif elements.typecode in ('B', 'H', 'I', 'L', 'Q'): + self._WriteScalarVector( + Type.UINT, elements.itemsize, elements, fixed=False) + else: + raise ValueError('unsupported array typecode: %s' % elements.typecode) + else: + add = self.Add if element_type is None else self.Adder(element_type) + with self.TypedVector(): + for e in elements: + add(e) + + @InMap + def FixedTypedVectorFromElements(self, + elements, + element_type=None, + byte_width=0): + """Encodes sequence of elements of the same type as fixed typed vector. + + Args: + elements: Sequence of elements, they must be of the same type. Allowed + types are `Type.INT`, `Type.UINT`, `Type.FLOAT`. Allowed number of + elements are 2, 3, or 4. + element_type: Suggested element type. Setting it to None means determining + correct value automatically based on the given elements. + byte_width: Number of bytes to use per element. For `Type.INT` and + `Type.UINT`: 1, 2, 4, or 8. For `Type.FLOAT`: 4 or 8. Setting it to 0 + means determining correct value automatically based on the given + elements. + """ + if not 2 <= len(elements) <= 4: + raise ValueError('only 2, 3, or 4 elements are supported') + + types = {type(e) for e in elements} + if len(types) != 1: + raise TypeError('all elements must be of the same type') + + type_, = types + + if element_type is None: + element_type = {int: Type.INT, float: Type.FLOAT}.get(type_) + if not element_type: + raise TypeError('unsupported element_type: %s' % type_) + + if byte_width == 0: + width = { + Type.UINT: BitWidth.U, + Type.INT: BitWidth.I, + Type.FLOAT: BitWidth.F + }[element_type] + byte_width = 1 << max(width(e) for e in elements) + + self._WriteScalarVector(element_type, byte_width, elements, fixed=True) + + def _StartMap(self): + """Starts map construction.""" + return len(self._stack) + + def _EndMap(self, start): + """Finishes map construction by encodung its elements.""" + # Interleaved keys and values on the stack. + stack = self._stack[start:] + + if len(stack) % 2 != 0: + raise RuntimeError('must be even number of keys and values') + + for key in stack[::2]: + if key.Type is not Type.KEY: + raise RuntimeError('all map keys must be of %s type' % Type.KEY) + + pairs = zip(stack[::2], stack[1::2]) # [(key, value), ...] + pairs = sorted(pairs, key=lambda pair: self._ReadKey(pair[0].Value)) + + del self._stack[start:] + for pair in pairs: + self._stack.extend(pair) + + keys = self._CreateVector(self._stack[start::2], typed=True, fixed=False) + values = self._CreateVector( + self._stack[start + 1::2], typed=False, fixed=False, keys=keys) + + del self._stack[start:] + self._stack.append(values) + return values.Value + + @contextlib.contextmanager + def Map(self, key=None): + if key: + self.Key(key) + + try: + start = self._StartMap() + yield self + finally: + self._EndMap(start) + + def MapFromElements(self, elements): + start = self._StartMap() + for k, v in elements.items(): + self.Key(k) + self.Add(v) + self._EndMap(start) + + def Adder(self, type_): + return { + Type.BOOL: self.Bool, + Type.INT: self.Int, + Type.INDIRECT_INT: self.IndirectInt, + Type.UINT: self.UInt, + Type.INDIRECT_UINT: self.IndirectUInt, + Type.FLOAT: self.Float, + Type.INDIRECT_FLOAT: self.IndirectFloat, + Type.KEY: self.Key, + Type.BLOB: self.Blob, + Type.STRING: self.String, + }[type_] + + @InMapForString + def Add(self, value): + """Encodes value of any supported type.""" + if value is None: + self.Null() + elif isinstance(value, bool): + self.Bool(value) + elif isinstance(value, int): + self.Int(value) + elif isinstance(value, float): + self.Float(value) + elif isinstance(value, str): + self.String(value) + elif isinstance(value, (bytes, bytearray)): + self.Blob(value) + elif isinstance(value, dict): + with self.Map(): + for k, v in value.items(): + self.Key(k) + self.Add(v) + elif isinstance(value, array.array): + self.TypedVectorFromElements(value) + elif _IsIterable(value): + self.VectorFromElements(value) + else: + raise TypeError('unsupported python type: %s' % type(value)) + + @property + def LastValue(self): + return self._stack[-1] + + @InMap + def ReuseValue(self, value): + self._stack.append(value) + + +def GetRoot(buf): + """Returns root `Ref` object for the given buffer.""" + if len(buf) < 3: + raise ValueError('buffer is too small') + byte_width = buf[-1] + return Ref.PackedType( + Buf(buf, -(2 + byte_width)), byte_width, packed_type=buf[-2]) + + +def Dumps(obj): + """Returns bytearray with the encoded python object.""" + fbb = Builder() + fbb.Add(obj) + return fbb.Finish() + + +def Loads(buf): + """Returns python object decoded from the buffer.""" + return GetRoot(buf).Value |