summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorDmitriy Kovalev <dkovalev@google.com>2020-05-07 14:26:33 -0700
committerGitHub <noreply@github.com>2020-05-07 14:26:33 -0700
commitde89bd193370c8b33686f1f33edd63593e48cd3f (patch)
tree6aa825db27ce7142b5ce0af28ca12ffedd59f0f0 /python
parent8be05f6bd474a9942d41d98ad197d9813af1da1a (diff)
downloadflatbuffers-de89bd193370c8b33686f1f33edd63593e48cd3f.tar.gz
flatbuffers-de89bd193370c8b33686f1f33edd63593e48cd3f.tar.bz2
flatbuffers-de89bd193370c8b33686f1f33edd63593e48cd3f.zip
Implement flexbuffers in python (#5880)
Diffstat (limited to 'python')
-rw-r--r--python/flatbuffers/flexbuffers.py1527
1 files changed, 1527 insertions, 0 deletions
diff --git a/python/flatbuffers/flexbuffers.py b/python/flatbuffers/flexbuffers.py
new file mode 100644
index 00000000..da10668a
--- /dev/null
+++ b/python/flatbuffers/flexbuffers.py
@@ -0,0 +1,1527 @@
+# Lint as: python3
+# Copyright 2020 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Implementation of FlexBuffers binary format.
+
+For more info check https://google.github.io/flatbuffers/flexbuffers.html and
+corresponding C++ implementation at
+https://github.com/google/flatbuffers/blob/master/include/flatbuffers/flexbuffers.h
+"""
+
+# pylint: disable=invalid-name
+# TODO(dkovalev): Add type hints everywhere, so tools like pytypes could work.
+
+import array
+import contextlib
+import enum
+import struct
+
+__all__ = ('Type', 'Builder', 'GetRoot', 'Dumps', 'Loads')
+
+
+class BitWidth(enum.IntEnum):
+ """Supported bit widths of value types.
+
+ These are used in the lower 2 bits of a type field to determine the size of
+ the elements (and or size field) of the item pointed to (e.g. vector).
+ """
+ W8 = 0 # 2^0 = 1 byte
+ W16 = 1 # 2^1 = 2 bytes
+ W32 = 2 # 2^2 = 4 bytes
+ W64 = 3 # 2^3 = 8 bytes
+
+ @staticmethod
+ def U(value):
+ """Returns the minimum `BitWidth` to encode unsigned integer value."""
+ assert value >= 0
+
+ if value < (1 << 8):
+ return BitWidth.W8
+ elif value < (1 << 16):
+ return BitWidth.W16
+ elif value < (1 << 32):
+ return BitWidth.W32
+ elif value < (1 << 64):
+ return BitWidth.W64
+ else:
+ raise ValueError('value is too big to encode: %s' % value)
+
+ @staticmethod
+ def I(value):
+ """Returns the minimum `BitWidth` to encode signed integer value."""
+ # -2^(n-1) <= value < 2^(n-1)
+ # -2^n <= 2 * value < 2^n
+ # 2 * value < 2^n, when value >= 0 or 2 * (-value) <= 2^n, when value < 0
+ # 2 * value < 2^n, when value >= 0 or 2 * (-value) - 1 < 2^n, when value < 0
+ #
+ # if value >= 0:
+ # return BitWidth.U(2 * value)
+ # else:
+ # return BitWidth.U(2 * (-value) - 1) # ~x = -x - 1
+ value *= 2
+ return BitWidth.U(value if value >= 0 else ~value)
+
+ @staticmethod
+ def F(value):
+ """Returns the `BitWidth` to encode floating point value."""
+ if struct.unpack('f', struct.pack('f', value))[0] == value:
+ return BitWidth.W32
+ return BitWidth.W64
+
+ @staticmethod
+ def B(byte_width):
+ return {
+ 1: BitWidth.W8,
+ 2: BitWidth.W16,
+ 4: BitWidth.W32,
+ 8: BitWidth.W64
+ }[byte_width]
+
+
+I = {1: 'b', 2: 'h', 4: 'i', 8: 'q'} # Integer formats
+U = {1: 'B', 2: 'H', 4: 'I', 8: 'Q'} # Unsigned integer formats
+F = {4: 'f', 8: 'd'} # Floating point formats
+
+
+def _Unpack(fmt, buf):
+ return struct.unpack(fmt[len(buf)], buf)[0]
+
+
+def _UnpackVector(fmt, buf, length):
+ byte_width = len(buf) // length
+ return struct.unpack('%d%s' % (length, fmt[byte_width]), buf)
+
+
+def _Pack(fmt, value, byte_width):
+ return struct.pack(fmt[byte_width], value)
+
+
+def _PackVector(fmt, values, byte_width):
+ return struct.pack('%d%s' % (len(values), fmt[byte_width]), *values)
+
+
+def _Mutate(fmt, buf, value, byte_width, value_bit_width):
+ if (1 << value_bit_width) <= byte_width:
+ buf[:byte_width] = _Pack(fmt, value, byte_width)
+ return True
+ return False
+
+
+# Computes how many bytes you'd have to pad to be able to write an
+# "scalar_size" scalar if the buffer had grown to "buf_size",
+# "scalar_size" is a power of two.
+def _PaddingBytes(buf_size, scalar_size):
+ # ((buf_size + (scalar_size - 1)) // scalar_size) * scalar_size - buf_size
+ return -buf_size & (scalar_size - 1)
+
+
+def _ShiftSlice(s, offset, length):
+ start = offset + (0 if s.start is None else s.start)
+ stop = offset + (length if s.stop is None else s.stop)
+ return slice(start, stop, s.step)
+
+
+# https://en.cppreference.com/w/cpp/algorithm/lower_bound
+def _LowerBound(values, value, pred):
+ """Implementation of C++ std::lower_bound() algorithm."""
+ first, last = 0, len(values)
+ count = last - first
+ while count > 0:
+ i = first
+ step = count // 2
+ i += step
+ if pred(values[i], value):
+ i += 1
+ first = i
+ count -= step + 1
+ else:
+ count = step
+ return first
+
+
+# https://en.cppreference.com/w/cpp/algorithm/binary_search
+def _BinarySearch(values, value, pred=lambda x, y: x < y):
+ """Implementation of C++ std::binary_search() algorithm."""
+ index = _LowerBound(values, value, pred)
+ if index != len(values) and not pred(value, values[index]):
+ return index
+ return -1
+
+
+class Type(enum.IntEnum):
+ """Supported types of encoded data.
+
+ These are used as the upper 6 bits of a type field to indicate the actual
+ type.
+ """
+ NULL = 0
+ INT = 1
+ UINT = 2
+ FLOAT = 3
+ # Types above stored inline, types below store an offset.
+ KEY = 4
+ STRING = 5
+ INDIRECT_INT = 6
+ INDIRECT_UINT = 7
+ INDIRECT_FLOAT = 8
+ MAP = 9
+ VECTOR = 10 # Untyped.
+
+ VECTOR_INT = 11 # Typed any size (stores no type table).
+ VECTOR_UINT = 12
+ VECTOR_FLOAT = 13
+ VECTOR_KEY = 14
+ # DEPRECATED, use VECTOR or VECTOR_KEY instead.
+ # Read test.cpp/FlexBuffersDeprecatedTest() for details on why.
+ VECTOR_STRING_DEPRECATED = 15
+
+ VECTOR_INT2 = 16 # Typed tuple (no type table, no size field).
+ VECTOR_UINT2 = 17
+ VECTOR_FLOAT2 = 18
+ VECTOR_INT3 = 19 # Typed triple (no type table, no size field).
+ VECTOR_UINT3 = 20
+ VECTOR_FLOAT3 = 21
+ VECTOR_INT4 = 22 # Typed quad (no type table, no size field).
+ VECTOR_UINT4 = 23
+ VECTOR_FLOAT4 = 24
+
+ BLOB = 25
+ BOOL = 26
+ VECTOR_BOOL = 36 # To do the same type of conversion of type to vector type
+
+ @staticmethod
+ def Pack(type_, bit_width):
+ return (int(type_) << 2) | bit_width
+
+ @staticmethod
+ def Unpack(packed_type):
+ return 1 << (packed_type & 0b11), Type(packed_type >> 2)
+
+ @staticmethod
+ def IsInline(type_):
+ return type_ <= Type.FLOAT or type_ == Type.BOOL
+
+ @staticmethod
+ def IsTypedVector(type_):
+ return Type.VECTOR_INT <= type_ <= Type.VECTOR_STRING_DEPRECATED or \
+ type_ == Type.VECTOR_BOOL
+
+ @staticmethod
+ def IsTypedVectorElementType(type_):
+ return Type.INT <= type_ <= Type.STRING or type_ == Type.BOOL
+
+ @staticmethod
+ def ToTypedVectorElementType(type_):
+ if not Type.IsTypedVector(type_):
+ raise ValueError('must be typed vector type')
+
+ return Type(type_ - Type.VECTOR_INT + Type.INT)
+
+ @staticmethod
+ def IsFixedTypedVector(type_):
+ return Type.VECTOR_INT2 <= type_ <= Type.VECTOR_FLOAT4
+
+ @staticmethod
+ def IsFixedTypedVectorElementType(type_):
+ return Type.INT <= type_ <= Type.FLOAT
+
+ @staticmethod
+ def ToFixedTypedVectorElementType(type_):
+ if not Type.IsFixedTypedVector(type_):
+ raise ValueError('must be fixed typed vector type')
+
+ # 3 types each, starting from length 2.
+ fixed_type = type_ - Type.VECTOR_INT2
+ return Type(fixed_type % 3 + Type.INT), fixed_type // 3 + 2
+
+ @staticmethod
+ def ToTypedVector(element_type, fixed_len=0):
+ """Converts element type to corresponding vector type.
+
+ Args:
+ element_type: vector element type
+ fixed_len: number of elements: 0 for typed vector; 2, 3, or 4 for fixed
+ typed vector.
+
+ Returns:
+ Typed vector type or fixed typed vector type.
+ """
+ if fixed_len == 0:
+ if not Type.IsTypedVectorElementType(element_type):
+ raise ValueError('must be typed vector element type')
+ else:
+ if not Type.IsFixedTypedVectorElementType(element_type):
+ raise ValueError('must be fixed typed vector element type')
+
+ offset = element_type - Type.INT
+ if fixed_len == 0:
+ return Type(offset + Type.VECTOR_INT) # TypedVector
+ elif fixed_len == 2:
+ return Type(offset + Type.VECTOR_INT2) # FixedTypedVector
+ elif fixed_len == 3:
+ return Type(offset + Type.VECTOR_INT3) # FixedTypedVector
+ elif fixed_len == 4:
+ return Type(offset + Type.VECTOR_INT4) # FixedTypedVector
+ else:
+ raise ValueError('unsupported fixed_len: %s' % fixed_len)
+
+
+class Buf:
+ """Class to access underlying buffer object starting from the given offset."""
+
+ def __init__(self, buf, offset):
+ self._buf = buf
+ self._offset = offset if offset >= 0 else len(buf) + offset
+ self._length = len(buf) - self._offset
+
+ def __getitem__(self, key):
+ if isinstance(key, slice):
+ return self._buf[_ShiftSlice(key, self._offset, self._length)]
+ elif isinstance(key, int):
+ return self._buf[self._offset + key]
+ else:
+ raise TypeError('invalid key type')
+
+ def __setitem__(self, key, value):
+ if isinstance(key, slice):
+ self._buf[_ShiftSlice(key, self._offset, self._length)] = value
+ elif isinstance(key, int):
+ self._buf[self._offset + key] = key
+ else:
+ raise TypeError('invalid key type')
+
+ def __repr__(self):
+ return 'buf[%d:]' % self._offset
+
+ def Find(self, sub):
+ """Returns the lowest index where the sub subsequence is found."""
+ return self._buf[self._offset:].find(sub)
+
+ def Slice(self, offset):
+ """Returns new `Buf` which starts from the given offset."""
+ return Buf(self._buf, self._offset + offset)
+
+ def Indirect(self, offset, byte_width):
+ """Return new `Buf` based on the encoded offset (indirect encoding)."""
+ return self.Slice(offset - _Unpack(U, self[offset:offset + byte_width]))
+
+
+class Object:
+ """Base class for all non-trivial data accessors."""
+ __slots__ = '_buf', '_byte_width'
+
+ def __init__(self, buf, byte_width):
+ self._buf = buf
+ self._byte_width = byte_width
+
+ @property
+ def ByteWidth(self):
+ return self._byte_width
+
+
+class Sized(Object):
+ """Base class for all data accessors which need to read encoded size."""
+ __slots__ = '_size',
+
+ def __init__(self, buf, byte_width, size=0):
+ super().__init__(buf, byte_width)
+ if size == 0:
+ self._size = _Unpack(U, self.SizeBytes)
+ else:
+ self._size = size
+
+ @property
+ def SizeBytes(self):
+ return self._buf[-self._byte_width:0]
+
+ def __len__(self):
+ return self._size
+
+
+class Blob(Sized):
+ """Data accessor for the encoded blob bytes."""
+ __slots__ = ()
+
+ @property
+ def Bytes(self):
+ return self._buf[0:len(self)]
+
+ def __repr__(self):
+ return 'Blob(%s, size=%d)' % (self._buf, len(self))
+
+
+class String(Sized):
+ """Data accessor for the encoded string bytes."""
+ __slots__ = ()
+
+ @property
+ def Bytes(self):
+ return self._buf[0:len(self)]
+
+ def Mutate(self, value):
+ """Mutates underlying string bytes in place.
+
+ Args:
+ value: New string to replace the existing one. New string must have less
+ or equal UTF-8-encoded bytes than the existing one to successfully
+ mutate underlying byte buffer.
+
+ Returns:
+ Whether the value was mutated or not.
+ """
+ encoded = value.encode('utf-8')
+ n = len(encoded)
+ if n <= len(self):
+ self._buf[-self._byte_width:0] = _Pack(U, n, self._byte_width)
+ self._buf[0:n] = encoded
+ self._buf[n:len(self)] = bytearray(len(self) - n)
+ return True
+ return False
+
+ def __str__(self):
+ return self.Bytes.decode('utf-8')
+
+ def __repr__(self):
+ return 'String(%s, size=%d)' % (self._buf, len(self))
+
+
+class Key(Object):
+ """Data accessor for the encoded key bytes."""
+ __slots__ = ()
+
+ def __init__(self, buf, byte_width):
+ assert byte_width == 1
+ super().__init__(buf, byte_width)
+
+ @property
+ def Bytes(self):
+ return self._buf[0:len(self)]
+
+ def __len__(self):
+ return self._buf.Find(0)
+
+ def __str__(self):
+ return self.Bytes.decode('ascii')
+
+ def __repr__(self):
+ return 'Key(%s, size=%d)' % (self._buf, len(self))
+
+
+class Vector(Sized):
+ """Data accessor for the encoded vector bytes."""
+ __slots__ = ()
+
+ def __getitem__(self, index):
+ if index < 0 or index >= len(self):
+ raise IndexError('vector index %s is out of [0, %d) range' % \
+ (index, len(self)))
+
+ packed_type = self._buf[len(self) * self._byte_width + index]
+ buf = self._buf.Slice(index * self._byte_width)
+ return Ref.PackedType(buf, self._byte_width, packed_type)
+
+ @property
+ def Value(self):
+ """Returns the underlying encoded data as a list object."""
+ return [e.Value for e in self]
+
+ def __repr__(self):
+ return 'Vector(%s, byte_width=%d, size=%d)' % \
+ (self._buf, self._byte_width, self._size)
+
+
+class TypedVector(Sized):
+ """Data accessor for the encoded typed vector or fixed typed vector bytes."""
+ __slots__ = '_element_type', '_size'
+
+ def __init__(self, buf, byte_width, element_type, size=0):
+ super().__init__(buf, byte_width, size)
+
+ if element_type == Type.STRING:
+ # These can't be accessed as strings, since we don't know the bit-width
+ # of the size field, see the declaration of
+ # FBT_VECTOR_STRING_DEPRECATED above for details.
+ # We change the type here to be keys, which are a subtype of strings,
+ # and will ignore the size field. This will truncate strings with
+ # embedded nulls.
+ element_type = Type.KEY
+
+ self._element_type = element_type
+
+ @property
+ def Bytes(self):
+ return self._buf[:self._byte_width * len(self)]
+
+ @property
+ def ElementType(self):
+ return self._element_type
+
+ def __getitem__(self, index):
+ if index < 0 or index >= len(self):
+ raise IndexError('vector index %s is out of [0, %d) range' % \
+ (index, len(self)))
+
+ buf = self._buf.Slice(index * self._byte_width)
+ return Ref(buf, self._byte_width, 1, self._element_type)
+
+ @property
+ def Value(self):
+ """Returns underlying data as list object."""
+ if not self:
+ return []
+
+ if self._element_type is Type.BOOL:
+ return [bool(e) for e in _UnpackVector(U, self.Bytes, len(self))]
+ elif self._element_type is Type.INT:
+ return list(_UnpackVector(I, self.Bytes, len(self)))
+ elif self._element_type is Type.UINT:
+ return list(_UnpackVector(U, self.Bytes, len(self)))
+ elif self._element_type is Type.FLOAT:
+ return list(_UnpackVector(F, self.Bytes, len(self)))
+ elif self._element_type is Type.KEY:
+ return [e.AsKey for e in self]
+ elif self._element_type is Type.STRING:
+ return [e.AsString for e in self]
+ else:
+ raise TypeError('unsupported element_type: %s' % self._element_type)
+
+ def __repr__(self):
+ return 'TypedVector(%s, byte_width=%d, element_type=%s, size=%d)' % \
+ (self._buf, self._byte_width, self._element_type, self._size)
+
+
+class Map(Vector):
+ """Data accessor for the encoded map bytes."""
+
+ @staticmethod
+ def CompareKeys(a, b):
+ if isinstance(a, Ref):
+ a = a.AsKeyBytes
+ if isinstance(b, Ref):
+ b = b.AsKeyBytes
+ return a < b
+
+ def __getitem__(self, key):
+ if isinstance(key, int):
+ return super().__getitem__(key)
+
+ index = _BinarySearch(self.Keys, key.encode('ascii'), self.CompareKeys)
+ if index != -1:
+ return super().__getitem__(index)
+
+ raise KeyError(key)
+
+ @property
+ def Keys(self):
+ byte_width = _Unpack(U, self._buf[-2 * self._byte_width:-self._byte_width])
+ buf = self._buf.Indirect(-3 * self._byte_width, self._byte_width)
+ return TypedVector(buf, byte_width, Type.KEY)
+
+ @property
+ def Values(self):
+ return Vector(self._buf, self._byte_width)
+
+ @property
+ def Value(self):
+ return {k.Value: v.Value for k, v in zip(self.Keys, self.Values)}
+
+ def __repr__(self):
+ return 'Map(%s, size=%d)' % (self._buf, len(self))
+
+
+class Ref:
+ """Data accessor for the encoded data bytes."""
+ __slots__ = '_buf', '_parent_width', '_byte_width', '_type'
+
+ @staticmethod
+ def PackedType(buf, parent_width, packed_type):
+ byte_width, type_ = Type.Unpack(packed_type)
+ return Ref(buf, parent_width, byte_width, type_)
+
+ def __init__(self, buf, parent_width, byte_width, type_):
+ self._buf = buf
+ self._parent_width = parent_width
+ self._byte_width = byte_width
+ self._type = type_
+
+ def __repr__(self):
+ return 'Ref(%s, parent_width=%d, byte_width=%d, type_=%s)' % \
+ (self._buf, self._parent_width, self._byte_width, self._type)
+
+ @property
+ def _Bytes(self):
+ return self._buf[:self._parent_width]
+
+ def _ConvertError(self, target_type):
+ raise TypeError('cannot convert %s to %s' % (self._type, target_type))
+
+ def _Indirect(self):
+ return self._buf.Indirect(0, self._parent_width)
+
+ @property
+ def IsNull(self):
+ return self._type is Type.NULL
+
+ @property
+ def IsBool(self):
+ return self._type is Type.BOOL
+
+ @property
+ def AsBool(self):
+ if self._type is Type.BOOL:
+ return bool(_Unpack(U, self._Bytes))
+ else:
+ return self.AsInt != 0
+
+ def MutateBool(self, value):
+ """Mutates underlying boolean value bytes in place.
+
+ Args:
+ value: New boolean value.
+
+ Returns:
+ Whether the value was mutated or not.
+ """
+ return self.IsBool and \
+ _Mutate(U, self._buf, value, self._parent_width, BitWidth.W8)
+
+ @property
+ def IsNumeric(self):
+ return self.IsInt or self.IsFloat
+
+ @property
+ def IsInt(self):
+ return self._type in (Type.INT, Type.INDIRECT_INT, Type.UINT,
+ Type.INDIRECT_UINT)
+
+ @property
+ def AsInt(self):
+ """Returns current reference as integer value."""
+ if self.IsNull:
+ return 0
+ elif self.IsBool:
+ return int(self.AsBool)
+ elif self._type is Type.INT:
+ return _Unpack(I, self._Bytes)
+ elif self._type is Type.INDIRECT_INT:
+ return _Unpack(I, self._Indirect()[:self._byte_width])
+ if self._type is Type.UINT:
+ return _Unpack(U, self._Bytes)
+ elif self._type is Type.INDIRECT_UINT:
+ return _Unpack(U, self._Indirect()[:self._byte_width])
+ elif self.IsString:
+ return len(self.AsString)
+ elif self.IsKey:
+ return len(self.AsKey)
+ elif self.IsBlob:
+ return len(self.AsBlob)
+ elif self.IsVector:
+ return len(self.AsVector)
+ elif self.IsTypedVector:
+ return len(self.AsTypedVector)
+ elif self.IsFixedTypedVector:
+ return len(self.AsFixedTypedVector)
+ else:
+ raise self._ConvertError(Type.INT)
+
+ def MutateInt(self, value):
+ """Mutates underlying integer value bytes in place.
+
+ Args:
+ value: New integer value. It must fit to the byte size of the existing
+ encoded value.
+
+ Returns:
+ Whether the value was mutated or not.
+ """
+ if self._type is Type.INT:
+ return _Mutate(I, self._buf, value, self._parent_width, BitWidth.I(value))
+ elif self._type is Type.INDIRECT_INT:
+ return _Mutate(I, self._Indirect(), value, self._byte_width,
+ BitWidth.I(value))
+ elif self._type is Type.UINT:
+ return _Mutate(U, self._buf, value, self._parent_width, BitWidth.U(value))
+ elif self._type is Type.INDIRECT_UINT:
+ return _Mutate(U, self._Indirect(), value, self._byte_width,
+ BitWidth.U(value))
+ else:
+ return False
+
+ @property
+ def IsFloat(self):
+ return self._type in (Type.FLOAT, Type.INDIRECT_FLOAT)
+
+ @property
+ def AsFloat(self):
+ """Returns current reference as floating point value."""
+ if self.IsNull:
+ return 0.0
+ elif self.IsBool:
+ return float(self.AsBool)
+ elif self.IsInt:
+ return float(self.AsInt)
+ elif self._type is Type.FLOAT:
+ return _Unpack(F, self._Bytes)
+ elif self._type is Type.INDIRECT_FLOAT:
+ return _Unpack(F, self._Indirect()[:self._byte_width])
+ elif self.IsString:
+ return float(self.AsString)
+ elif self.IsVector:
+ return float(len(self.AsVector))
+ elif self.IsTypedVector():
+ return float(len(self.AsTypedVector))
+ elif self.IsFixedTypedVector():
+ return float(len(self.FixedTypedVector))
+ else:
+ raise self._ConvertError(Type.FLOAT)
+
+ def MutateFloat(self, value):
+ """Mutates underlying floating point value bytes in place.
+
+ Args:
+ value: New float value. It must fit to the byte size of the existing
+ encoded value.
+
+ Returns:
+ Whether the value was mutated or not.
+ """
+ if self._type is Type.FLOAT:
+ return _Mutate(F, self._buf, value, self._parent_width,
+ BitWidth.B(self._parent_width))
+ elif self._type is Type.INDIRECT_FLOAT:
+ return _Mutate(F, self._Indirect(), value, self._byte_width,
+ BitWidth.B(self._byte_width))
+ else:
+ return False
+
+ @property
+ def IsKey(self):
+ return self._type is Type.KEY
+
+ @property
+ def AsKeyBytes(self):
+ if self.IsKey:
+ return Key(self._Indirect(), self._byte_width).Bytes
+ else:
+ raise self._ConvertError(Type.KEY)
+
+ @property
+ def AsKey(self):
+ if self.IsKey:
+ return str(Key(self._Indirect(), self._byte_width))
+ else:
+ raise self._ConvertError(Type.KEY)
+
+ @property
+ def IsString(self):
+ return self._type is Type.STRING
+
+ @property
+ def AsString(self):
+ if self.IsString:
+ return str(String(self._Indirect(), self._byte_width))
+ elif self.IsKey:
+ return self.AsKey
+ else:
+ raise self._ConvertError(Type.STRING)
+
+ def MutateString(self, value):
+ return String(self._Indirect(), self._byte_width).Mutate(value)
+
+ @property
+ def IsBlob(self):
+ return self._type is Type.BLOB
+
+ @property
+ def AsBlob(self):
+ if self.IsBlob:
+ return Blob(self._Indirect(), self._byte_width).Bytes
+ else:
+ raise self._ConvertError(Type.BLOB)
+
+ @property
+ def IsAnyVector(self):
+ return self.IsVector or self.IsTypedVector or self.IsFixedTypedVector()
+
+ @property
+ def IsVector(self):
+ return self._type in (Type.VECTOR, Type.MAP)
+
+ @property
+ def AsVector(self):
+ if self.IsVector:
+ return Vector(self._Indirect(), self._byte_width)
+ else:
+ raise self._ConvertError(Type.VECTOR)
+
+ @property
+ def IsTypedVector(self):
+ return Type.IsTypedVector(self._type)
+
+ @property
+ def AsTypedVector(self):
+ if self.IsTypedVector:
+ return TypedVector(self._Indirect(), self._byte_width,
+ Type.ToTypedVectorElementType(self._type))
+ else:
+ raise self._ConvertError('TYPED_VECTOR')
+
+ @property
+ def IsFixedTypedVector(self):
+ return Type.IsFixedTypedVector(self._type)
+
+ @property
+ def AsFixedTypedVector(self):
+ if self.IsFixedTypedVector:
+ element_type, size = Type.ToFixedTypedVectorElementType(self._type)
+ return TypedVector(self._Indirect(), self._byte_width, element_type, size)
+ else:
+ raise self._ConvertError('FIXED_TYPED_VECTOR')
+
+ @property
+ def IsMap(self):
+ return self._type is Type.MAP
+
+ @property
+ def AsMap(self):
+ if self.IsMap:
+ return Map(self._Indirect(), self._byte_width)
+ else:
+ raise self._ConvertError(Type.MAP)
+
+ @property
+ def Value(self):
+ """Converts current reference to value of corresponding type.
+
+ This is equivalent to calling `AsInt` for integer values, `AsFloat` for
+ floating point values, etc.
+
+ Returns:
+ Value of corresponding type.
+ """
+ if self.IsNull:
+ return None
+ elif self.IsBool:
+ return self.AsBool
+ elif self.IsInt:
+ return self.AsInt
+ elif self.IsFloat:
+ return self.AsFloat
+ elif self.IsString:
+ return self.AsString
+ elif self.IsKey:
+ return self.AsKey
+ elif self.IsBlob:
+ return self.AsBlob
+ elif self.IsMap:
+ return self.AsMap.Value
+ elif self.IsVector:
+ return self.AsVector.Value
+ elif self.IsTypedVector:
+ return self.AsTypedVector.Value
+ elif self.IsFixedTypedVector:
+ return self.AsFixedTypedVector.Value
+ else:
+ raise TypeError('cannot convert %r to value' % self)
+
+
+def _IsIterable(obj):
+ try:
+ iter(obj)
+ return True
+ except TypeError:
+ return False
+
+
+class Value:
+ """Class to represent given value during the encoding process."""
+
+ @staticmethod
+ def Null():
+ return Value(0, Type.NULL, BitWidth.W8)
+
+ @staticmethod
+ def Bool(value):
+ return Value(value, Type.BOOL, BitWidth.W8)
+
+ @staticmethod
+ def Int(value, bit_width):
+ return Value(value, Type.INT, bit_width)
+
+ @staticmethod
+ def UInt(value, bit_width):
+ return Value(value, Type.UINT, bit_width)
+
+ @staticmethod
+ def Float(value, bit_width):
+ return Value(value, Type.FLOAT, bit_width)
+
+ @staticmethod
+ def Key(offset):
+ return Value(offset, Type.KEY, BitWidth.W8)
+
+ def __init__(self, value, type_, min_bit_width):
+ self._value = value
+ self._type = type_
+
+ # For scalars: of itself, for vector: of its elements, for string: length.
+ self._min_bit_width = min_bit_width
+
+ @property
+ def Value(self):
+ return self._value
+
+ @property
+ def Type(self):
+ return self._type
+
+ @property
+ def MinBitWidth(self):
+ return self._min_bit_width
+
+ def StoredPackedType(self, parent_bit_width=BitWidth.W8):
+ return Type.Pack(self._type, self.StoredWidth(parent_bit_width))
+
+ # We have an absolute offset, but want to store a relative offset
+ # elem_index elements beyond the current buffer end. Since whether
+ # the relative offset fits in a certain byte_width depends on
+ # the size of the elements before it (and their alignment), we have
+ # to test for each size in turn.
+ def ElemWidth(self, buf_size, elem_index=0):
+ if Type.IsInline(self._type):
+ return self._min_bit_width
+ for byte_width in 1, 2, 4, 8:
+ offset_loc = buf_size + _PaddingBytes(buf_size, byte_width) + \
+ elem_index * byte_width
+ bit_width = BitWidth.U(offset_loc - self._value)
+ if byte_width == (1 << bit_width):
+ return bit_width
+ raise ValueError('relative offset is too big')
+
+ def StoredWidth(self, parent_bit_width=BitWidth.W8):
+ if Type.IsInline(self._type):
+ return max(self._min_bit_width, parent_bit_width)
+ return self._min_bit_width
+
+ def __repr__(self):
+ return 'Value(%s, %s, %s)' % (self._value, self._type, self._min_bit_width)
+
+ def __str__(self):
+ return str(self._value)
+
+
+def InMap(func):
+ def wrapper(self, *args, **kwargs):
+ if isinstance(args[0], str):
+ self.Key(args[0])
+ func(self, *args[1:], **kwargs)
+ else:
+ func(self, *args, **kwargs)
+ return wrapper
+
+
+def InMapForString(func):
+ def wrapper(self, *args):
+ if len(args) == 1:
+ func(self, args[0])
+ elif len(args) == 2:
+ self.Key(args[0])
+ func(self, args[1])
+ else:
+ raise ValueError('invalid number of arguments')
+ return wrapper
+
+
+class Pool:
+ """Collection of (data, offset) pairs sorted by data for quick access."""
+
+ def __init__(self):
+ self._pool = [] # sorted list of (data, offset) tuples
+
+ def FindOrInsert(self, data, offset):
+ do = data, offset
+ index = _BinarySearch(self._pool, do, lambda a, b: a[0] < b[0])
+ if index != -1:
+ _, offset = self._pool[index]
+ return offset
+ self._pool.insert(index, do)
+ return None
+
+ def Clear(self):
+ self._pool = []
+
+ @property
+ def Elements(self):
+ return [data for data, _ in self._pool]
+
+
+class Builder:
+ """Helper class to encode structural data into flexbuffers format."""
+
+ def __init__(self,
+ share_strings=False,
+ share_keys=True,
+ force_min_bit_width=BitWidth.W8):
+ self._share_strings = share_strings
+ self._share_keys = share_keys
+ self._force_min_bit_width = force_min_bit_width
+
+ self._string_pool = Pool()
+ self._key_pool = Pool()
+
+ self._finished = False
+ self._buf = bytearray()
+ self._stack = []
+
+ def __len__(self):
+ return len(self._buf)
+
+ @property
+ def StringPool(self):
+ return self._string_pool
+
+ @property
+ def KeyPool(self):
+ return self._key_pool
+
+ def Clear(self):
+ self._string_pool.Clear()
+ self._key_pool.Clear()
+ self._finished = False
+ self._buf = bytearray()
+ self._stack = []
+
+ def Finish(self):
+ """Finishes encoding process and returns underlying buffer."""
+ if self._finished:
+ raise RuntimeError('builder has been already finished')
+
+ # If you hit this exception, you likely have objects that were never
+ # included in a parent. You need to have exactly one root to finish a
+ # buffer. Check your Start/End calls are matched, and all objects are inside
+ # some other object.
+ if len(self._stack) != 1:
+ raise RuntimeError('internal stack size must be one')
+
+ value = self._stack[0]
+ byte_width = self._Align(value.ElemWidth(len(self._buf)))
+ self._WriteAny(value, byte_width=byte_width) # Root value
+ self._Write(U, value.StoredPackedType(), byte_width=1) # Root type
+ self._Write(U, byte_width, byte_width=1) # Root size
+
+ self.finished = True
+ return self._buf
+
+ def _ReadKey(self, offset):
+ key = self._buf[offset:]
+ return key[:key.find(0)]
+
+ def _Align(self, alignment):
+ byte_width = 1 << alignment
+ self._buf.extend(b'\x00' * _PaddingBytes(len(self._buf), byte_width))
+ return byte_width
+
+ def _Write(self, fmt, value, byte_width):
+ self._buf.extend(_Pack(fmt, value, byte_width))
+
+ def _WriteVector(self, fmt, values, byte_width):
+ self._buf.extend(_PackVector(fmt, values, byte_width))
+
+ def _WriteOffset(self, offset, byte_width):
+ relative_offset = len(self._buf) - offset
+ assert byte_width == 8 or relative_offset < (1 << (8 * byte_width))
+ self._Write(U, relative_offset, byte_width)
+
+ def _WriteAny(self, value, byte_width):
+ fmt = {
+ Type.NULL: U, Type.BOOL: U, Type.INT: I, Type.UINT: U, Type.FLOAT: F
+ }.get(value.Type)
+ if fmt:
+ self._Write(fmt, value.Value, byte_width)
+ else:
+ self._WriteOffset(value.Value, byte_width)
+
+ def _WriteBlob(self, data, append_zero, type_):
+ bit_width = BitWidth.U(len(data))
+ byte_width = self._Align(bit_width)
+ self._Write(U, len(data), byte_width)
+ loc = len(self._buf)
+ self._buf.extend(data)
+ if append_zero:
+ self._buf.append(0)
+ self._stack.append(Value(loc, type_, bit_width))
+ return loc
+
+ def _WriteScalarVector(self, element_type, byte_width, elements, fixed):
+ """Writes scalar vector elements to the underlying buffer."""
+ bit_width = BitWidth.B(byte_width)
+ # If you get this exception, you're trying to write a vector with a size
+ # field that is bigger than the scalars you're trying to write (e.g. a
+ # byte vector > 255 elements). For such types, write a "blob" instead.
+ if BitWidth.U(len(elements)) > bit_width:
+ raise ValueError('too many elements for the given byte_width')
+
+ self._Align(bit_width)
+ if not fixed:
+ self._Write(U, len(elements), byte_width)
+
+ loc = len(self._buf)
+
+ fmt = {Type.INT: I, Type.UINT: U, Type.FLOAT: F}.get(element_type)
+ if not fmt:
+ raise TypeError('unsupported element_type')
+ self._WriteVector(fmt, elements, byte_width)
+
+ type_ = Type.ToTypedVector(element_type, len(elements) if fixed else 0)
+ self._stack.append(Value(loc, type_, bit_width))
+ return loc
+
+ def _CreateVector(self, elements, typed, fixed, keys=None):
+ """Writes vector elements to the underlying buffer."""
+ length = len(elements)
+
+ if fixed and not typed:
+ raise ValueError('fixed vector must be typed')
+
+ # Figure out smallest bit width we can store this vector with.
+ bit_width = max(self._force_min_bit_width, BitWidth.U(length))
+ prefix_elems = 1 # Vector size
+ if keys:
+ bit_width = max(bit_width, keys.ElemWidth(len(self._buf)))
+ prefix_elems += 2 # Offset to the keys vector and its byte width.
+
+ vector_type = Type.KEY
+ # Check bit widths and types for all elements.
+ for i, e in enumerate(elements):
+ bit_width = max(bit_width, e.ElemWidth(len(self._buf), prefix_elems + i))
+
+ if typed:
+ if i == 0:
+ vector_type = e.Type
+ else:
+ if vector_type != e.Type:
+ raise RuntimeError('typed vector elements must be of the same type')
+
+ if fixed and not Type.IsFixedTypedVectorElementType(vector_type):
+ raise RuntimeError('must be fixed typed vector element type')
+
+ byte_width = self._Align(bit_width)
+ # Write vector. First the keys width/offset if available, and size.
+ if keys:
+ self._WriteOffset(keys.Value, byte_width)
+ self._Write(U, 1 << keys.MinBitWidth, byte_width)
+
+ if not fixed:
+ self._Write(U, length, byte_width)
+
+ # Then the actual data.
+ loc = len(self._buf)
+ for e in elements:
+ self._WriteAny(e, byte_width)
+
+ # Then the types.
+ if not typed:
+ for e in elements:
+ self._buf.append(e.StoredPackedType(bit_width))
+
+ if keys:
+ type_ = Type.MAP
+ else:
+ if typed:
+ type_ = Type.ToTypedVector(vector_type, length if fixed else 0)
+ else:
+ type_ = Type.VECTOR
+
+ return Value(loc, type_, bit_width)
+
+ def _PushIndirect(self, value, type_, bit_width):
+ byte_width = self._Align(bit_width)
+ loc = len(self._buf)
+ fmt = {
+ Type.INDIRECT_INT: I,
+ Type.INDIRECT_UINT: U,
+ Type.INDIRECT_FLOAT: F
+ }[type_]
+ self._Write(fmt, value, byte_width)
+ self._stack.append(Value(loc, type_, bit_width))
+
+ @InMapForString
+ def String(self, value):
+ """Encodes string value."""
+ reset_to = len(self._buf)
+ encoded = value.encode('utf-8')
+ loc = self._WriteBlob(encoded, append_zero=True, type_=Type.STRING)
+ if self._share_strings:
+ prev_loc = self._string_pool.FindOrInsert(encoded, loc)
+ if prev_loc is not None:
+ del self._buf[reset_to:]
+ self._stack[-1]._value = loc = prev_loc # pylint: disable=protected-access
+
+ return loc
+
+ @InMap
+ def Blob(self, value):
+ """Encodes binary blob value.
+
+ Args:
+ value: A byte/bytearray value to encode
+
+ Returns:
+ Offset of the encoded value in underlying the byte buffer.
+ """
+ return self._WriteBlob(value, append_zero=False, type_=Type.BLOB)
+
+ def Key(self, value):
+ """Encodes key value.
+
+ Args:
+ value: A byte/bytearray/str value to encode. Byte object must not contain
+ zero bytes. String object must be convertible to ASCII.
+
+ Returns:
+ Offset of the encoded value in the underlying byte buffer.
+ """
+ if isinstance(value, (bytes, bytearray)):
+ encoded = value
+ else:
+ encoded = value.encode('ascii')
+
+ if 0 in encoded:
+ raise ValueError('key contains zero byte')
+
+ loc = len(self._buf)
+ self._buf.extend(encoded)
+ self._buf.append(0)
+ if self._share_keys:
+ prev_loc = self._key_pool.FindOrInsert(encoded, loc)
+ if prev_loc is not None:
+ del self._buf[loc:]
+ loc = prev_loc
+
+ self._stack.append(Value.Key(loc))
+ return loc
+
+ def Null(self, key=None):
+ """Encodes None value."""
+ if key:
+ self.Key(key)
+ self._stack.append(Value.Null())
+
+ @InMap
+ def Bool(self, value):
+ """Encodes boolean value.
+
+ Args:
+ value: A boolean value.
+ """
+ self._stack.append(Value.Bool(value))
+
+ @InMap
+ def Int(self, value, byte_width=0):
+ """Encodes signed integer value.
+
+ Args:
+ value: A signed integer value.
+ byte_width: Number of bytes to use: 1, 2, 4, or 8.
+ """
+ bit_width = BitWidth.I(value) if byte_width == 0 else BitWidth.B(byte_width)
+ self._stack.append(Value.Int(value, bit_width))
+
+ @InMap
+ def IndirectInt(self, value, byte_width=0):
+ """Encodes signed integer value indirectly.
+
+ Args:
+ value: A signed integer value.
+ byte_width: Number of bytes to use: 1, 2, 4, or 8.
+ """
+ bit_width = BitWidth.I(value) if byte_width == 0 else BitWidth.B(byte_width)
+ self._PushIndirect(value, Type.INDIRECT_INT, bit_width)
+
+ @InMap
+ def UInt(self, value, byte_width=0):
+ """Encodes unsigned integer value.
+
+ Args:
+ value: An unsigned integer value.
+ byte_width: Number of bytes to use: 1, 2, 4, or 8.
+ """
+ bit_width = BitWidth.U(value) if byte_width == 0 else BitWidth.B(byte_width)
+ self._stack.append(Value.UInt(value, bit_width))
+
+ @InMap
+ def IndirectUInt(self, value, byte_width=0):
+ """Encodes unsigned integer value indirectly.
+
+ Args:
+ value: An unsigned integer value.
+ byte_width: Number of bytes to use: 1, 2, 4, or 8.
+ """
+ bit_width = BitWidth.U(value) if byte_width == 0 else BitWidth.B(byte_width)
+ self._PushIndirect(value, Type.INDIRECT_UINT, bit_width)
+
+ @InMap
+ def Float(self, value, byte_width=0):
+ """Encodes floating point value.
+
+ Args:
+ value: A floating point value.
+ byte_width: Number of bytes to use: 4 or 8.
+ """
+ bit_width = BitWidth.F(value) if byte_width == 0 else BitWidth.B(byte_width)
+ self._stack.append(Value.Float(value, bit_width))
+
+ @InMap
+ def IndirectFloat(self, value, byte_width=0):
+ """Encodes floating point value indirectly.
+
+ Args:
+ value: A floating point value.
+ byte_width: Number of bytes to use: 4 or 8.
+ """
+ bit_width = BitWidth.F(value) if byte_width == 0 else BitWidth.B(byte_width)
+ self._PushIndirect(value, Type.INDIRECT_FLOAT, bit_width)
+
+ def _StartVector(self):
+ """Starts vector construction."""
+ return len(self._stack)
+
+ def _EndVector(self, start, typed, fixed):
+ """Finishes vector construction by encodung its elements."""
+ vec = self._CreateVector(self._stack[start:], typed, fixed)
+ del self._stack[start:]
+ self._stack.append(vec)
+ return vec.Value
+
+ @contextlib.contextmanager
+ def Vector(self, key=None):
+ if key:
+ self.Key(key)
+
+ try:
+ start = self._StartVector()
+ yield self
+ finally:
+ self._EndVector(start, typed=False, fixed=False)
+
+ @InMap
+ def VectorFromElements(self, elements):
+ """Encodes sequence of any elements as a vector.
+
+ Args:
+ elements: sequence of elements, they may have different types.
+ """
+ with self.Vector():
+ for e in elements:
+ self.Add(e)
+
+ @contextlib.contextmanager
+ def TypedVector(self, key=None):
+ if key:
+ self.Key(key)
+
+ try:
+ start = self._StartVector()
+ yield self
+ finally:
+ self._EndVector(start, typed=True, fixed=False)
+
+ @InMap
+ def TypedVectorFromElements(self, elements, element_type=None):
+ """Encodes sequence of elements of the same type as typed vector.
+
+ Args:
+ elements: Sequence of elements, they must be of the same type.
+ element_type: Suggested element type. Setting it to None means determining
+ correct value automatically based on the given elements.
+ """
+ if isinstance(elements, array.array):
+ if elements.typecode == 'f':
+ self._WriteScalarVector(Type.FLOAT, 4, elements, fixed=False)
+ elif elements.typecode == 'd':
+ self._WriteScalarVector(Type.FLOAT, 8, elements, fixed=False)
+ elif elements.typecode in ('b', 'h', 'i', 'l', 'q'):
+ self._WriteScalarVector(
+ Type.INT, elements.itemsize, elements, fixed=False)
+ elif elements.typecode in ('B', 'H', 'I', 'L', 'Q'):
+ self._WriteScalarVector(
+ Type.UINT, elements.itemsize, elements, fixed=False)
+ else:
+ raise ValueError('unsupported array typecode: %s' % elements.typecode)
+ else:
+ add = self.Add if element_type is None else self.Adder(element_type)
+ with self.TypedVector():
+ for e in elements:
+ add(e)
+
+ @InMap
+ def FixedTypedVectorFromElements(self,
+ elements,
+ element_type=None,
+ byte_width=0):
+ """Encodes sequence of elements of the same type as fixed typed vector.
+
+ Args:
+ elements: Sequence of elements, they must be of the same type. Allowed
+ types are `Type.INT`, `Type.UINT`, `Type.FLOAT`. Allowed number of
+ elements are 2, 3, or 4.
+ element_type: Suggested element type. Setting it to None means determining
+ correct value automatically based on the given elements.
+ byte_width: Number of bytes to use per element. For `Type.INT` and
+ `Type.UINT`: 1, 2, 4, or 8. For `Type.FLOAT`: 4 or 8. Setting it to 0
+ means determining correct value automatically based on the given
+ elements.
+ """
+ if not 2 <= len(elements) <= 4:
+ raise ValueError('only 2, 3, or 4 elements are supported')
+
+ types = {type(e) for e in elements}
+ if len(types) != 1:
+ raise TypeError('all elements must be of the same type')
+
+ type_, = types
+
+ if element_type is None:
+ element_type = {int: Type.INT, float: Type.FLOAT}.get(type_)
+ if not element_type:
+ raise TypeError('unsupported element_type: %s' % type_)
+
+ if byte_width == 0:
+ width = {
+ Type.UINT: BitWidth.U,
+ Type.INT: BitWidth.I,
+ Type.FLOAT: BitWidth.F
+ }[element_type]
+ byte_width = 1 << max(width(e) for e in elements)
+
+ self._WriteScalarVector(element_type, byte_width, elements, fixed=True)
+
+ def _StartMap(self):
+ """Starts map construction."""
+ return len(self._stack)
+
+ def _EndMap(self, start):
+ """Finishes map construction by encodung its elements."""
+ # Interleaved keys and values on the stack.
+ stack = self._stack[start:]
+
+ if len(stack) % 2 != 0:
+ raise RuntimeError('must be even number of keys and values')
+
+ for key in stack[::2]:
+ if key.Type is not Type.KEY:
+ raise RuntimeError('all map keys must be of %s type' % Type.KEY)
+
+ pairs = zip(stack[::2], stack[1::2]) # [(key, value), ...]
+ pairs = sorted(pairs, key=lambda pair: self._ReadKey(pair[0].Value))
+
+ del self._stack[start:]
+ for pair in pairs:
+ self._stack.extend(pair)
+
+ keys = self._CreateVector(self._stack[start::2], typed=True, fixed=False)
+ values = self._CreateVector(
+ self._stack[start + 1::2], typed=False, fixed=False, keys=keys)
+
+ del self._stack[start:]
+ self._stack.append(values)
+ return values.Value
+
+ @contextlib.contextmanager
+ def Map(self, key=None):
+ if key:
+ self.Key(key)
+
+ try:
+ start = self._StartMap()
+ yield self
+ finally:
+ self._EndMap(start)
+
+ def MapFromElements(self, elements):
+ start = self._StartMap()
+ for k, v in elements.items():
+ self.Key(k)
+ self.Add(v)
+ self._EndMap(start)
+
+ def Adder(self, type_):
+ return {
+ Type.BOOL: self.Bool,
+ Type.INT: self.Int,
+ Type.INDIRECT_INT: self.IndirectInt,
+ Type.UINT: self.UInt,
+ Type.INDIRECT_UINT: self.IndirectUInt,
+ Type.FLOAT: self.Float,
+ Type.INDIRECT_FLOAT: self.IndirectFloat,
+ Type.KEY: self.Key,
+ Type.BLOB: self.Blob,
+ Type.STRING: self.String,
+ }[type_]
+
+ @InMapForString
+ def Add(self, value):
+ """Encodes value of any supported type."""
+ if value is None:
+ self.Null()
+ elif isinstance(value, bool):
+ self.Bool(value)
+ elif isinstance(value, int):
+ self.Int(value)
+ elif isinstance(value, float):
+ self.Float(value)
+ elif isinstance(value, str):
+ self.String(value)
+ elif isinstance(value, (bytes, bytearray)):
+ self.Blob(value)
+ elif isinstance(value, dict):
+ with self.Map():
+ for k, v in value.items():
+ self.Key(k)
+ self.Add(v)
+ elif isinstance(value, array.array):
+ self.TypedVectorFromElements(value)
+ elif _IsIterable(value):
+ self.VectorFromElements(value)
+ else:
+ raise TypeError('unsupported python type: %s' % type(value))
+
+ @property
+ def LastValue(self):
+ return self._stack[-1]
+
+ @InMap
+ def ReuseValue(self, value):
+ self._stack.append(value)
+
+
+def GetRoot(buf):
+ """Returns root `Ref` object for the given buffer."""
+ if len(buf) < 3:
+ raise ValueError('buffer is too small')
+ byte_width = buf[-1]
+ return Ref.PackedType(
+ Buf(buf, -(2 + byte_width)), byte_width, packed_type=buf[-2])
+
+
+def Dumps(obj):
+ """Returns bytearray with the encoded python object."""
+ fbb = Builder()
+ fbb.Add(obj)
+ return fbb.Finish()
+
+
+def Loads(buf):
+ """Returns python object decoded from the buffer."""
+ return GetRoot(buf).Value