diff options
Diffstat (limited to 'tests/nnapi/nnapi_test_generator/android-q-beta/test_generator.py')
-rw-r--r-- | tests/nnapi/nnapi_test_generator/android-q-beta/test_generator.py | 1236 |
1 files changed, 1236 insertions, 0 deletions
diff --git a/tests/nnapi/nnapi_test_generator/android-q-beta/test_generator.py b/tests/nnapi/nnapi_test_generator/android-q-beta/test_generator.py new file mode 100644 index 000000000..f49385c78 --- /dev/null +++ b/tests/nnapi/nnapi_test_generator/android-q-beta/test_generator.py @@ -0,0 +1,1236 @@ +#!/usr/bin/python3 + +# Copyright 2017, The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""NN model compiler + +Contain classes definition and utilify functions for compiling models and +examples into NDK-based CTS and VTS unit tests. + +Used by cts_generator.py, vts_generator.py, and spec_visualizer.py +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +import argparse +import copy +from functools import reduce +import itertools +import math +import os +import re +import struct +import sys +import contextlib +import pprint +import numpy as np + +def GetJointStr(l, sep=", ", method=str): + return sep.join([method(i) for i in l]) + +# Print in C float literal format +def PrettyPrintAsFloat(x): + s = str(float(x)) + if s.find(".") >= 0 or s.find("e") >= 0: + return s + "f" + else: + return s + ".0f" + +# Transform from original type to float32 +def Dequantize(v, ty): + v -= ty.zeroPoint + if ty.scale != 0: + v *= ty.scale + if isinstance(ty.extraParams, SymmPerChannelQuantParams): + v *= ty.extraParams.GetScalesBroadcastArray(ty.dimensions) + return v + +# Transform float32 to target data type +def Quantize(v, ty): + if ty.scale != 0: + v /= ty.scale + if isinstance(ty.extraParams, SymmPerChannelQuantParams): + v = v / ty.extraParams.GetScalesBroadcastArray(ty.dimensions) + v += ty.zeroPoint + if not ty.IsFloat(): + v = np.round(v) + v = int(v) if np.isscalar(v) else v.astype(int) + if ty.type == "TENSOR_QUANT8_ASYMM": + v = np.minimum(np.maximum(v, 0), 255) + elif ty.type == "TENSOR_QUANT16_ASYMM": + v = np.minimum(np.maximum(v, 0), 65535) + elif ty.type == "TENSOR_QUANT8_SYMM_PER_CHANNEL": + v = np.minimum(np.maximum(v, -127), 127) + elif ty.type == "UINT32": + v = np.maximum(v, 0) + return v + +@contextlib.contextmanager +def SmartOpen(filename=None, mode="w"): + if filename and filename != '-': + fh = open(filename, mode) + else: + fh = sys.stdout + + try: + yield fh + finally: + if fh is not sys.stdout: + fh.close() + +# Tracking objects inside a model with a unique name +class NamedObject: + existingNames = set() + + def __init__(self, *args, sep="_", showZero=False, startsFrom=0, skipRenaming=False): + name = GetJointStr([i for i in args if i is not None and i != ""], sep=sep) + if skipRenaming: + self.name = name + return + # make the name unique by renaming with a suffix number + uniqueName = name if showZero is False else name + sep + str(startsFrom) + while uniqueName in self.__class__.existingNames: + startsFrom += 1 + uniqueName = name + sep + str(startsFrom) + self.__class__.existingNames.add(uniqueName) + self.name = uniqueName + + def __str__(self): + return self.name + __repr__ = __str__ + + # Since names are unique, objects with the same name are considered equal + def __eq__(self, other): + return isinstance(other, NamedObject) and self.name == other.name + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash(self.name) + + def __lt__(self, other): + return self.name < other.name + +# Types, operands should all have a unique name since they share the same namespace +class NamedVariable(NamedObject): + existingNames = set() + def __init__(self, *args, sep="_", showZero=False, startsFrom=0, skipRenaming=False): + NamedObject.__init__(self, *args, sep=sep, showZero=showZero, + startsFrom=startsFrom, skipRenaming=skipRenaming) + +# Global variables in the spec namespace such as CreateModel, is_ignored, and examples +class GlobalVariable(NamedVariable): + def __init__(self, *args, skipRenaming=False): + NamedObject.__init__(self, *args, startsFrom=1, skipRenaming=skipRenaming) + +# Each test should have a unique name, but will not conflict with variables +class NamedTest(NamedObject): + existingNames = set() + def __init__(self, *args, startsFrom=0, skipRenaming=False): + NamedObject.__init__(self, *args, startsFrom=1, skipRenaming=skipRenaming) + +class Type(NamedVariable): + typesMap = dict() + typeLookup = { + "INT32": "int32_t", + "UINT32": "uint32_t", + "FLOAT32": "float", + "FLOAT16": "_Float16", + "TENSOR_INT32": "int32_t", + "TENSOR_FLOAT16": "_Float16", + "TENSOR_FLOAT32": "float", + "TENSOR_QUANT8_ASYMM": "uint8_t", + "TENSOR_QUANT8_SYMM": "int8_t", + "BOOL": "bool8", + "TENSOR_QUANT16_ASYMM": "uint16_t", + "TENSOR_QUANT16_SYMM": "int16_t", + "TENSOR_BOOL8": "bool8", + "TENSOR_QUANT8_SYMM_PER_CHANNEL": "int8_t", +# "OEM_SCALAR": this is service-defined. + "TENSOR_OEM_BYTE": "uint8_t", + } + + # types are named as "type0", "type1", ... + def __init__(self, vt, dimensions, scale, zeroPoint, name="type", skipRenaming=False, + extraParams=None): + NamedVariable.__init__(self, name, sep="", showZero=True, skipRenaming=skipRenaming) + self.type = vt + self.dimensions = dimensions + self.scale = float(scale) + self.zeroPoint = int(zeroPoint) + self.extraParams = extraParams + + # Factory for Type object, only create a new Type if requested type does + # not have a match with all existing types + @staticmethod + def GetType(vt, dimensions, scale=0, zeroPoint=0, extraParams=None): + key = ",".join([vt, str(dimensions), str(scale), str(zeroPoint), str(extraParams)]) + if key not in Type.typesMap: + Type.typesMap[key] = Type(vt, dimensions, scale, zeroPoint, extraParams=extraParams) + return Type.typesMap[key] + + @staticmethod + def GetAllTypes(): + # sort to ensure a stable order when dumping the code + return sorted(Type.typesMap.values()) + + # For backward-compatibility + @staticmethod + def GetTypeFromString(vt, shape, extraParams=None): + dimensions, scale, zeroPoint = Type.GetParsedShape(shape) + scale = float(scale) + zeroPoint = int(zeroPoint) + return Type.GetType(vt, dimensions, scale, zeroPoint, extraParams) + + # For backward-compatibility + @staticmethod + def GetParsedShape(shape): + # Parse shape + if (shape != "" and shape != "{}"): + left, sep, right = shape.partition('{') + real_shape, sep, right = right.partition('}') + shape = [int(x) for x in real_shape.split(",")] + # left now looks like "0.0f, 127.5f, " + scale, sep, zero_point = right.rpartition(',') + if scale == "": + if zero_point == "": + return shape, "0", "0" + return shape, zero_point, "0" + left, sep, scale = scale.partition(',') + return shape, scale.replace("f", ""), zero_point + else: + return [], "0", "0" + + def GetNumberOfElements(self): + return reduce(lambda x,y: x*y, self.dimensions, 1) + + def GetCppTypeString(self): + return Type.typeLookup[self.type] + + def IsFloat(self): + return self.GetCppTypeString() in ["float", "_Float16"] + + def IsBool(self): + return self.GetCppTypeString() == "bool8" + + def IsScalar(self): + return not self.type.startswith("TENSOR_") + + def GetElementByteSize(self): + cppTypeString = self.GetCppTypeString() + if cppTypeString in ["uint8_t", "int8_t", "bool8"]: + return 1 + elif cppTypeString in ["int16_t", "uint16_t", "_Float16"]: + return 2 + else: + return 4 + + def GetByteSize(self): + return self.GetElementByteSize() * self.GetNumberOfElements() + + def GetDimensionsString(self): + return "{" + GetJointStr(self.dimensions) + "}" + + def GetSignatureTuple(self): + return (self.type, self.dimensions, self.scale, self.zeroPoint) + + def ToUnspecifiedDim(self): + return Type.GetType(self.type, [0] * len(self.dimensions), self.scale, self.zeroPoint) + +# To track implicitly convertible parameter types +class ImplicitParameter(): + @staticmethod + def ImplicitConvertion(value): + if isinstance(value, Operand): + return value + for implicitType in ImplicitParameter.__subclasses__(): + if implicitType.IsCompatible(value): + return implicitType("param", value) + assert False, "%s not supported for implicit parameter"%value + + +# ExtraParams with per-channel quantization. +class SymmPerChannelQuantParams(): + def __init__(self, channelDim, scales, hide = False): + self.channelDim = channelDim + self.scales = scales + self.hide = hide + + def GetScalesBroadcastArray(self, dimensions): + bshape = [1] * len(dimensions) + bshape[self.channelDim] = len(self.scales) + return np.array(self.scales).reshape(bshape) + + def GetConstructor(self): + return "SymmPerChannelQuantParams({%s},%d)" % ( + ", ".join(str(x) + "f" for x in self.scales), self.channelDim) + + def GetVtsSetter(self): + return "channelQuant" + + def GetVtsConstructor(self): + return "SymmPerChannelQuantParams{.scales={%s}, .channelDim=%d}" % ( + ", ".join(str(x) + "f" for x in self.scales), self.channelDim) + + +# An operand that can be fed into operations. Also, an operand is always +# declared before operations. +class Operand(NamedVariable): + + def __init__(self, name, opType, value, backward=None, skipRenaming=False, extraParams=None): + NamedVariable.__init__(self, name, sep="", skipRenaming=skipRenaming) + if type(opType) is str: + self.type = Type.GetTypeFromString(opType, value, extraParams) + value = backward + else: + self.type = Type.GetType(*opType, extraParams=extraParams) + self.SetValue(value) + self.dimensions = self.type.dimensions + self.lifetime = "TEMPORARY_VARIABLE" + self.model_index = None + self.ins = [] + self.outs = [] + + def SetValue(self, value): + self.value = value if type(value) is list or type(value) is tuple else [value] + return self + + def SetValueFromNumpy(self, value): + self.value = value.flatten().tolist() + return self + + def GetValueAsNumpy(self): + return np.array(self.value).reshape(self.type.dimensions) + + # Print value as cpp-style list initialization + def GetListInitialization(self): + assert self.value is not None, \ + "Trying to print operand %s with None value"%(str(self)) + if self.type.IsFloat(): + return "{%s}"%(GetJointStr(self.value, method=PrettyPrintAsFloat)) + elif self.type.IsBool(): + return "{%s}"%(GetJointStr(self.value, method=lambda v: "true" if v else "false")) + else: + return "{%s}"%(GetJointStr(self.value, method=lambda x: str(int(x)))) + + def ToUnspecifiedDim(self): + self.dimensions = self.type.dimensions + self.type = self.type.ToUnspecifiedDim() + +# Base class of user-defined input/output operand +class InOut(Operand): + + def __init__(self, name, opType, backward=None, skipRenaming=False, extraParams=None): + Operand.__init__(self, name, opType, backward, None, skipRenaming=skipRenaming, extraParams=extraParams) + self.lifetime = "MODEL_INPUT" + self.index = 0 + + def Feed(self, value): + self.SetValue(value[self] if type(value) is dict else value) + return self + + def GetListInitialization(self): + return "{%d, %s}"%(self.index, super().GetListInitialization()) + +# A user-declared input operand +class Input(InOut): + def __init__(self, name, opType, backward=None, skipRenaming=False, extraParams=None): + InOut.__init__(self, name, opType, backward, skipRenaming=skipRenaming, extraParams=extraParams) + self.lifetime = "MODEL_INPUT" + +# A user-declared output operand +class Output(InOut): + def __init__(self, name, opType, backward=None, skipRenaming=False): + InOut.__init__(self, name, opType, backward, skipRenaming=skipRenaming) + self.lifetime = "MODEL_OUTPUT" + +# An output that we don't want to compare the results +class IgnoredOutput(Output): + def __init__(self, name, opType, backward=None, skipRenaming=False): + Output.__init__(self, name, opType, backward, skipRenaming=skipRenaming) + self.lifetime = "MODEL_OUTPUT" + def Feed(self, value): + numElements = reduce(lambda x,y: x*y, self.dimensions, 1) + self.value = [0 for x in range(numElements)] + return self + +# An explicitly declared parameter +class Parameter(Operand): + def __init__(self, name, opType, value, backward=None, skipRenaming=False, extraParams=None): + Operand.__init__(self, name, opType, value, backward, skipRenaming=skipRenaming, + extraParams=extraParams) + self.initializer = NamedVariable(str(self) + "_init") + self.lifetime = "CONSTANT_REFERENCE" if Configuration.useSHM() else "CONSTANT_COPY" + +# A shortcut for parameters of INT32 +class Int32Scalar(Parameter, ImplicitParameter): + def __init__(self, name, value): + Parameter.__init__(self, name, ("INT32", []), int(value)) + @staticmethod + def IsCompatible(value): + return type(value) is int + +# A shortcut for parameters of FLOAT16 +class Float16Scalar(Parameter, ImplicitParameter): + def __init__(self, name, value): + Parameter.__init__(self, name, ("FLOAT16", []), float(value)) + @staticmethod + def IsCompatible(value): + return False + +# A shortcut for parameters of FLOAT32 +class Float32Scalar(Parameter, ImplicitParameter): + def __init__(self, name, value): + Parameter.__init__(self, name, ("FLOAT32", []), float(value)) + @staticmethod + def IsCompatible(value): + return type(value) is float + +# A shortcut for parameters of BOOL +class BoolScalar(Parameter, ImplicitParameter): + def __init__(self, name, value): + Parameter.__init__(self, name, ("BOOL", []), bool(value)) + @staticmethod + def IsCompatible(value): + return type(value) is bool + +# A shortcut for parameter of 1-D TENSOR_INT32 +class Int32Vector(Parameter, ImplicitParameter): + def __init__(self, name, value): + Parameter.__init__(self, name, ("TENSOR_INT32", [len(value)]), [int(v) for v in value]) + @staticmethod + def IsCompatible(value): + if type(value) is not list and type(value) is not tuple: + return False + return all(type(i) is int for i in value) + +# A shortcut for parameter of 1-D TENSOR_FLOAT32 +class Float32Vector(Parameter, ImplicitParameter): + def __init__(self, name, value): + Parameter.__init__(self, name, ("TENSOR_FLOAT32", [len(value)]), [float(v) for v in value]) + @staticmethod + def IsCompatible(value): + if type(value) is not list and type(value) is not tuple: + return False + return all(type(i) is float for i in value) + +# An explicitly declared intermediate result +class Internal(Operand): + def __init__(self, name, opType, backward=None, skipRenaming=False): + Operand.__init__(self, name, opType, backward, None, skipRenaming=skipRenaming) + self.lifetime = "TEMPORARY_VARIABLE" + +# An operation in a model, does not need a name +class Operation: + + def __init__(self, optype, ins, outs): + self.optype = optype + self.SetInputs(ins) + self.SetOutputs(outs) + + # for the ease of debugging + def __str__(self): + insString = GetJointStr(self.ins) + outsString = GetJointStr(self.outs) + return "Operation %s: [%s] -> [%s]"%(self.optype, insString, outsString) + __repr__ = __str__ + + def SetInputs(self, ins): + self.ins = [ImplicitParameter.ImplicitConvertion(i) for i in ins] + return self + + def SetOutputs(self, outs): + self.outs = list(outs) + return self + +# Main interface +class Model: + models = list() + + def __init__(self, name=None): + self.name = name + self.operations = [] + self.operands = [] + self.isRelaxed = False + self.compiled = False + self.dumped = False + self.hasDynamicOutputShape = False + self.version = FileNames.version + Model.models.append(self) + + def WithSuffix(self, *args): + self.createFunctionName = GlobalVariable("CreateModel", self.name, *args) + self.createTestFunctionName = GlobalVariable("createTestModel", self.name, *args) + self.isIgnoredFunctionName = GlobalVariable("is_ignored", self.name, *args) + return self + + def AddOperation(self, operation): + self.operations.append(operation) + for i in operation.ins: + if i not in self.operands: + self.operands.append(i) + for o in operation.outs: + if o not in self.operands: + self.operands.append(o) + return self + + def Operation(self, op_name, *args): + return self.AddOperation(Operation(op_name, args, [])) + + def To(self, *args): + assert len(self.operations) > 0 + if type(args[0]) is tuple or type(args[0]) is list: + outs = args[0] + else: + outs = args + self.operations[-1].SetOutputs(outs) + for o in outs: + if o not in self.operands: + self.operands.append(o) + return self + + def RelaxedExecution(self, isRelaxed): + self.isRelaxed = isRelaxed + return self + + def TestDynamicOutputShape(self, hasDynamicOutputShape): + self.hasDynamicOutputShape = hasDynamicOutputShape + return self + + # Sets the version of the model in compliance tests. Set to None to disable the test. + def IntroducedIn(self, ver): + self.version = ver + return self + + def GetTypes(self): + return sorted(list(set(op.type for op in self.operands))) + + def GetInputs(self): + return [i for i in self.operands if isinstance(i, Input)] + + def GetOutputs(self): + return [o for o in self.operands if isinstance(o, Output)] + + def GetInputsIndex(self): + return [i for i,op in enumerate(self.operands) if isinstance(op, Input)] + + def GetOutputsIndex(self): + return [o for o,op in enumerate(self.operands) if isinstance(op, Output)] + + def GetIndexOfOperands(self, operands): + return [self.operands.index(i) for i in operands] + + def GetIgnoredOutputs(self): + return [o for o in self.operands if isinstance(o, IgnoredOutput)] + + def GetParameters(self): + return [p for p in self.operands if isinstance(p, Parameter)] + + def GetEquivalentOperands(self, targets): + return [self.operands[self.operands.index(t)] for t in targets] + + def UpdateEquivalentOperands(self, targets): + for t in targets: + self.operands[self.operands.index(t)] = t + return self + + def SetOperandIndex(self): + for ind, i in enumerate(self.GetInputs()): + i.index = ind + for ind, o in enumerate(self.GetOutputs()): + o.index = ind + for ind, op in enumerate(self.operands): + op.model_index = ind + return self + + def SetOperandInsAndOuts(self): + for op in self.operands: + op.ins = list() + op.outs = list() + for op in self.operations: + op.ins = self.GetEquivalentOperands(op.ins) + op.outs = self.GetEquivalentOperands(op.outs) + for i in op.ins: + i.outs.append(op) + for o in op.outs: + o.ins.append(op) + return self + + def TopologicalSortHelper(self, op, deps, visited): + if op in visited: + assert op not in deps, "Cycle detected in the graph" + else: + visited.add(op) + for i in deps[op]: + self.TopologicalSortHelper(i, deps, visited) + self.operations.append(op) + deps.pop(op) + + # Topological sort of the operations, and detect if there is a cycle is the graph + def TopologicalSort(self): + deps = {op: list() for op in self.operations} + [deps[o].append(i) for op in self.operands for o in op.outs for i in op.ins] + operations = self.operations.copy() + self.operations = [] + visited = set() + for op in operations: + self.TopologicalSortHelper(op, deps, visited) + + def SetOutputUnspecified(self): + for op in self.operands: + op.dimensions = op.type.dimensions + if self.hasDynamicOutputShape: + for op in self.GetOutputs(): + op.ToUnspecifiedDim() + return self + + def Compile(self): + if self.compiled: + return self + self.SetOperandIndex() + self.SetOperandInsAndOuts() + self.TopologicalSort() + self.SetOutputUnspecified() + # Do not check compliance for relaxed mode and dynamic output shape tests. + if self.isRelaxed or self.hasDynamicOutputShape: + self.IntroducedIn(None) + self.compiled = True + return self + +# To track implicitly convertible variation types +class ImplicitVariation: + @staticmethod + def ImplicitConvertion(value): + if isinstance(value, ModelVariation): + return value + for implicitType in ImplicitVariation.__subclasses__(): + value = value if type(value) is tuple or type(value) is list else [value] + if implicitType.IsCompatible(value[0]): + var = implicitType(value[0]) + if len(value) > 1: + var.Identify(*value[1:]) + return var + assert False, "%s not supported for implicit variation"%value[0] + +# The base class for model variations +class ModelVariation: + + def __init__(self, name=None): + self.targetOperands = {} + self.name = name + + def ApplyToHelper(self, model, args, feedDicts, transform): + opVarList = [] + for op in model.GetEquivalentOperands(sorted(args.keys())): + opVar = op + feedDictsVar = [] + if isinstance(op, Input) or isinstance(op, Output): + for feedDict in feedDicts: + op_tmp = copy.deepcopy(op) + if op_tmp in feedDict[0]: + opVar = transform(op_tmp.Feed(feedDict[0]), args[op_tmp]) + elif op_tmp in feedDict[1]: + opVar = transform(op_tmp.Feed(feedDict[1]), args[op_tmp]) + else: + assert False + feedDictsVar.append(opVar.value) + assert type(op) == type(opVar), "Can not handle %s -> %s"%(type(op), type(opVar)) + else: + opVar = transform(op, args[op]) + # handle Parameter -> Input + if isinstance(opVar, Input) or isinstance(opVar, Output): + feedDictsVar = [opVar.value] * len(feedDicts) + if isinstance(opVar, Input) or isinstance(opVar, Output): + for feedDict, feedDictVar in zip(feedDicts, feedDictsVar): + if opVar in feedDict[1]: + feedDict[1][opVar] = feedDictVar + else: + feedDict[0][opVar] = feedDictVar + opVarList.append(opVar) + return opVarList + + # Make a deepcopy of the model and feedDicts, and apply the change + def ApplyTo(self, modelOrigin, feedDictsOrigin): + model, feedDicts = copy.deepcopy((modelOrigin, feedDictsOrigin)) + model.compiled = False + model.dumped = False + + if not self.targetOperands: + self.AutoIdentify(model) + + # get transformed operands and update feedDicts + operandsVar = self.ApplyToHelper( + model, self.targetOperands, feedDicts, self.TransformOperand) + + model = self.TransformModel(model) + model.UpdateEquivalentOperands(operandsVar) + return model, feedDicts + + def IdentifyOperands(self, args=None): + if args is None: + return self + self.targetOperands = args if type(args) is dict else {i: None for i in args} + return self + + def Identify(self, operandArgs=None, paramArgs=None): + self.IdentifyOperands(operandArgs) + return self + + # Set variation to its default name + def SetToDefaultName(self): + self.name = "" + return self + + # Automatically select the target operand list + def AutoIdentify(self, model): + return self + + # Transform operands that are marked by IdentifyOperands() + def TransformOperand(self, op, arg=None): + return op + + # Transform the model + def TransformModel(self, model): + return model + +# Default variation that does nothing +class DefaultVariation(ModelVariation): + + def __init__(self, name=None): + ModelVariation.__init__(self, name=name) + +# Convert operand data type +class DataTypeConverter(ModelVariation, ImplicitVariation): + + def __init__(self, targetType=None, name=None): + ModelVariation.__init__(self, name=name) + if targetType is not None: + assert DataTypeConverter.IsCompatible(targetType) + self.targetType = targetType + + @staticmethod + def IsCompatible(value): + return value.lower() in ["float16", "int32"] + + def SetToDefaultName(self): + if self.targetType is not None: + self.name = self.targetType.lower() + return self + # get all target types + targetTypes = list(zip(*self.targetOperands.values()))[0] + if "TENSOR_QUANT8_SYMM_PER_CHANNEL" in targetTypes: + self.name = "channelQuant8" + elif "TENSOR_QUANT8_ASYMM" in targetTypes: + self.name = "quant8" + elif "TENSOR_INT32" in targetTypes: + self.name = "int32" + elif "TENSOR_FLOAT16" in targetTypes: + self.name = "float16" + else: + self.name = "float32" + return self + + def AutoIdentify(self, model): + if self.targetType is not None: + # By default, select all the float32 tensors/scalars + targets = {op: ["TENSOR_" + self.targetType.upper()] \ + for op in model.operands if op.type.type == "TENSOR_FLOAT32"} + targets.update({op: [self.targetType.upper()] \ + for op in model.operands if op.type.type == "FLOAT32"}) + self.Identify(targets) + return self + + def TransformOperand(self, op, arg=None): + if len(arg) == 1: + typeTuple = (arg[0], op.type.dimensions) + else: + typeTuple = (arg[0], op.type.dimensions, *arg[1:]) + # To handle Internal operands + if op.value is None or op.type.GetNumberOfElements() == 0: + op.type = Type.GetType(*typeTuple) + else: + v = Dequantize(op.GetValueAsNumpy().astype(np.float32), op.type) + op.type = Type.GetType(*typeTuple) + v = Quantize(v, op.type) + op.SetValueFromNumpy(v) + return op + +# Convert model to turn on/off relaxed computation +class RelaxedModeConverter(ModelVariation, ImplicitVariation): + + def __init__(self, isRelaxed=True, name=None): + ModelVariation.__init__(self, name=name) + if isinstance(isRelaxed, bool): + self.isRelaxed = isRelaxed + else: + assert RelaxedModeConverter.IsCompatible(isRelaxed.lower()) + self.isRelaxed = True + + @staticmethod + def IsCompatible(value): + return value.lower() in ["relaxed"] + + def SetToDefaultName(self): + self.name = "relaxed" if self.isRelaxed else "float" + return self + + def TransformModel(self, model): + model.RelaxedExecution(self.isRelaxed) + return model + +# Convert data layout between "NHWC" amd "NCHW" +class DataLayoutConverter(ModelVariation, ImplicitVariation): + + def __init__(self, targetLayout="nchw", name=None): + ModelVariation.__init__(self, name=name) + self.targetLayout = targetLayout.lower() + assert DataLayoutConverter.IsCompatible(self.targetLayout) + self.perm = (0, 3, 1, 2) if self.targetLayout == "nchw" else (0, 2, 3, 1) + self.param = True if self.targetLayout == "nchw" else False + + @staticmethod + def IsCompatible(value): + return value.lower() in ["nhwc", "nchw"] + + def SetToDefaultName(self): + self.name = self.targetLayout + return self + + def TransformOperand(self, op, arg=None): + if len(op.type.dimensions) == 4: + # To handle Internal operands + if op.value is not None and op.type.GetNumberOfElements() != 0: + op.SetValueFromNumpy(op.GetValueAsNumpy().transpose(self.perm)) + newDim = [op.type.dimensions[i] for i in self.perm] + op.type = Type.GetType(op.type.type, newDim, op.type.scale, op.type.zeroPoint) + elif len(op.type.dimensions) == 1 and len(op.value) == 4: + op.SetValueFromNumpy(op.GetValueAsNumpy()[list(self.perm)]) + elif op.type.type == "BOOL": + op.SetValue(self.param) + else: + assert False, "%s not supported by DataLayoutConverter"%op + return op + +# Convert data by tansposing and removing axis +class AxisConverter(ModelVariation): + + def __init__(self, origin, target, dim, drop=[], name=None): + ModelVariation.__init__(self, name=name) + self.origin = origin + self.target = target + assert all(i >= -dim and i < dim for i in [self.origin, self.target]) + self.dim = dim + self.perm = list(range(dim)) + self.perm.insert(target if target >= 0 else target + dim, self.perm.pop(origin)) + self.drop = [drop] if type(drop) is int else list(drop) + assert all(i >= -dim and i < dim for i in self.drop) + self.drop = [i if i >= 0 else i + dim for i in self.drop] + assert target not in self.drop and target + dim not in self.drop + + def SetToDefaultName(self): + axis = self.target if self.target >= 0 else self.target + self.dim + axis -= sum(i < axis for i in self.drop) + neg = "" if self.target >= 0 else "_neg" + self.name = "dim%d_axis%d%s"%(self.dim - len(self.drop), axis, neg) + return self + + def TransposeAxis(self, op): + if op.type.type == "INT32": + op.SetValue(self.target) + elif len(op.type.dimensions) == self.dim: + # To handle Internal operands + if op.value is not None: + op.SetValueFromNumpy(op.GetValueAsNumpy().transpose(self.perm)) + newDim = [op.type.dimensions[i] for i in self.perm] + op.type = Type.GetType(op.type.type, newDim, op.type.scale, op.type.zeroPoint) + else: + assert False, "%s not supported by AxisConverter"%op + return op + + def RemoveAxis(self, op): + if op.type.type == "INT32": + if op.value[0] >= 0: + op.SetValue(op.value[0] - sum(i < op.value[0] for i in self.drop)) + else: + op.SetValue(op.value[0] + sum(i > (op.value[0] + self.dim) for i in self.drop)) + elif len(op.type.dimensions) == self.dim: + if op.value is not None: + val = op.GetValueAsNumpy() + for i in sorted(self.drop, reverse=True): + val = np.take(val, 0, axis=i) + op.SetValueFromNumpy(val) + newDim = [op.type.dimensions[i] for i in range(self.dim) if i not in self.drop] + op.type = Type.GetType(op.type.type, newDim, op.type.scale, op.type.zeroPoint) + else: + assert False, "%s not supported by AxisConverter"%op + return op + + def TransformOperand(self, op, arg=None): + op = self.TransposeAxis(op) + op = self.RemoveAxis(op) + return op + +# Convert a Parameter to Input +class ParameterAsInputConverter(ModelVariation, ImplicitVariation): + + def __init__(self, arg="as_input", prefix="weight", name=None): + ModelVariation.__init__(self, name=name) + assert ParameterAsInputConverter.IsCompatible(arg.lower()) + self.prefix = prefix + + @staticmethod + def IsCompatible(value): + return value.lower() in ["as_input"] + + def SetToDefaultName(self): + self.name = self.prefix + "_as_input" + return self + + def TransformOperand(self, op, arg=None): + assert isinstance(op, Parameter), "%s cannot be converted to Input."%type(op) + newop = Input(op.name, op.type.GetSignatureTuple(), skipRenaming=True, extraParams=op.type.extraParams) + newop.SetValue(op.value) + return newop + +# Convert Output based on activation +class ActivationConverter(ModelVariation, ImplicitVariation): + # (Enum, low, high) + actMap = { + "none": (0, None, None), + "relu": (1, 0.0, None), + "relu1": (2, -1.0, 1.0), + "relu6": (3, 0.0, 6.0), + } + def __init__(self, act="relu", name=None): + ModelVariation.__init__(self, name=name) + self.act = act.lower() + assert ActivationConverter.IsCompatible(self.act) + self.enum = ActivationConverter.actMap[self.act][0] + self.low = ActivationConverter.actMap[self.act][1] + self.high = ActivationConverter.actMap[self.act][2] + + @staticmethod + def IsCompatible(value): + return value.lower() in ActivationConverter.actMap.keys() + + def SetToDefaultName(self): + self.name = self.act + return self + + def TransformOperand(self, op, arg=None): + if op.type.type == "INT32": # activation enum + return op.SetValue(self.enum) + else: + assert isinstance(op, Output) + v = op.GetValueAsNumpy() + if self.low is not None: + low = Quantize(self.low, op.type) + v = np.maximum(v, low) + if self.high is not None: + high = Quantize(self.high, op.type) + v = np.minimum(v, high) + return op.SetValueFromNumpy(v) + +class DynamicOutputShapeConverter(ModelVariation): + def __init__(self, name=None): + ModelVariation.__init__(self, name=name) + + def SetToDefaultName(self): + self.name = "dynamic_output_shape" + return self + + def TransformModel(self, model): + model.TestDynamicOutputShape(True) + return model + +# An example is always attached to a model, and could have multiple variations +class Example: + examples = [] + versionOverrides = {} + + def __init__(self, *args, model=None, name=None): + self.model = Model.models[-1] if model is None else model + self.name = name + self.expectedMultinomialDistributionTolerance = None + self.expectFailure = False + self.feedDicts = [] + for feedDict in args: + if type(feedDict) is tuple or type(feedDict) is list: + self.feedDicts.append(feedDict) + elif type(feedDict) is dict: + self.feedDicts.append(( + {i: feedDict[i] for i in self.model.GetInputs()}, + {o: feedDict[o] for o in self.model.GetOutputs()} + )) + else: + assert False + if Configuration.test_dynamic_output_shape: + self.variations = [[DefaultVariation(), DynamicOutputShapeConverter()]] + else: + self.variations = [] + Example.examples.append(self) + + @staticmethod + def SetVersion(ver, *args): + for name in args: + Example.versionOverrides[name] = ver + + # Main entrance of test generator + @staticmethod + def DumpAllExamples(DumpModel=None, model_fd=None, + DumpExample=None, example_fd=None, + DumpTest=None, test_fd=None): + Example.CombineAllExamples() + for example in Example.examples: + example.Dump(DumpModel, model_fd, DumpExample, example_fd, DumpTest, test_fd) + + # Combine examples with the same model, same name, and same set of variations + @staticmethod + def CombineAllExamples(): + modelMap = {} + newExamples = [] + for example in Example.examples: + key = (example.model, example.name, tuple(tuple(e) for e in example.variations)) + if key in modelMap: + modelMap[key].Combine(example) + else: + modelMap[key] = example + newExamples.append(example) + Example.examples = newExamples + + def AddVariations(self, *args, includeDefault=True, defaultName=None): + self.variations.append([DefaultVariation(defaultName)] if includeDefault else []) + self.variations[-1].extend(ImplicitVariation.ImplicitConvertion(i) for i in args) + return self + + def AddNchw(self, *args, includeDefault=True, defaultName="nhwc"): + var = DataLayoutConverter("nchw").Identify(args) + self.AddVariations(var, includeDefault=includeDefault, defaultName=defaultName) + return self + + def AddRelaxed(self, isRelaxed=True, includeDefault=True, defaultName=None): + var = RelaxedModeConverter(isRelaxed) + self.AddVariations(var, includeDefault=includeDefault, defaultName=defaultName) + return self + + def AddInput(self, *args, includeDefault=True, defaultName=None): + var = ParameterAsInputConverter().Identify(args) + self.AddVariations(var, includeDefault=includeDefault, defaultName=defaultName) + return self + + def AddRelu(self, *args, includeDefault=True, defaultName=None): + var = ActivationConverter("relu").Identify(args) + self.AddVariations(var, includeDefault=includeDefault, defaultName=defaultName) + return self + + def AddAllActivations(self, *args): + var = [ActivationConverter(i).Identify(args) + for i in sorted(ActivationConverter.actMap.keys())] + self.AddVariations(*var, includeDefault=False) + return self + + def GuessOriginalAxisAndDim(self, *args): + origin = None + dim = None + for arg in args: + if arg.type.type == "INT32": + origin = arg.value[0] + else: + if dim is None: + dim = len(arg.type.dimensions) + else: + assert dim == len(arg.type.dimensions) + assert dim is not None + origin = dim - 1 if origin is None else origin + origin = origin + dim if origin < 0 else origin + return origin, dim + + def AddAxis(self, axis, *args, includeDefault=True, defaultName=None): + origin, dim = self.GuessOriginalAxisAndDim(*args) + axis = [axis] if type(axis) is int else list(axis) + var = [AxisConverter(origin, a, dim).Identify(args) for a in axis] + self.AddVariations(*var, includeDefault=includeDefault, defaultName=defaultName) + return self + + def AddAllPositiveAxis(self, *args): + origin, dim = self.GuessOriginalAxisAndDim(*args) + var = [AxisConverter(origin, a, dim).Identify(args) for a in range(dim)] + self.AddVariations(*var, includeDefault=False) + return self + + def AddAllAxis(self, *args): + origin, dim = self.GuessOriginalAxisAndDim(*args) + var = [AxisConverter(origin, a, dim).Identify(args) for a in range(-dim, dim)] + self.AddVariations(*var, includeDefault=False) + return self + + def AddDims(self, dims, *args, includeDefault=True, defaultName=None): + origin, dim = self.GuessOriginalAxisAndDim(*args) + dims = [dims] if type(dims) is int else list(dims) + drop = list(range(dim)) + drop.pop(origin) + var = [AxisConverter(origin, origin, dim, drop[0:(dim-i)]).Identify(args) for i in dims] + self.AddVariations(*var, includeDefault=includeDefault, defaultName=defaultName) + return self + + def AddAllDims(self, *args): + origin, dim = self.GuessOriginalAxisAndDim(*args) + drop = list(range(dim)) + drop.pop(origin) + var = [AxisConverter(origin, origin, dim, drop[0:i]).Identify(args) for i in range(dim)] + self.AddVariations(*var, includeDefault=False) + return self + + def AddAllDimsAndPositiveAxis(self, *args): + origin, dim = self.GuessOriginalAxisAndDim(*args) + var = [AxisConverter(origin, j, dim, range(i)).Identify(args) \ + for i in range(dim) for j in range(i, dim)] + self.AddVariations(*var, includeDefault=False) + return self + + def AddAllDimsAndAxis(self, *args): + origin, dim = self.GuessOriginalAxisAndDim(*args) + var = [AxisConverter(origin, k, dim, range(i)).Identify(args) \ + for i in range(dim) for j in range(i, dim) for k in [j, j - dim]] + self.AddVariations(*var, includeDefault=False) + return self + + def Combine(self, other): + assert self.model is other.model, "Only examples targetting the same model can be combined" + assert tuple(self.variations) == tuple(other.variations), \ + "Only examples with the same set of variations can be combined" + assert self.name == other.name, "Only examples with the same name can be combined" + self.feedDicts.extend(other.feedDicts) + return self + + def Dump(self, DumpModel, model_fd, DumpExample, example_fd, DumpTest, test_fd): + [v.SetToDefaultName() for vs in self.variations for v in vs if v.name is None] + for variationList in itertools.product(*self.variations): + # Apply variations + modelOrigin, feedDictsOrigin = self.model, self.feedDicts + self.model, self.feedDicts = copy.deepcopy((self.model, self.feedDicts)) + for variation in variationList: + self.model, self.feedDicts = variation.ApplyTo(self.model, self.feedDicts) + # Concat names for test and examples + varNames = [v.name for v in variationList] + self.testName = NamedTest(FileNames.specName, self.model.name, self.name, *varNames) + self.examplesName = GlobalVariable("examples", self.model.name, self.name, *varNames) + if str(self.testName) in Example.versionOverrides: + self.model.IntroducedIn(Example.versionOverrides[str(self.testName)]) + self.model.WithSuffix(*varNames).Compile() + # Dump files + if DumpModel is not None and model_fd is not None: + DumpModel(self.model, model_fd) + if DumpExample is not None and example_fd is not None: + DumpExample(self, example_fd) + if DumpTest is not None and test_fd is not None: + DumpTest(self, test_fd) + # Restore model and feedDicts before variation + self.model = modelOrigin + self.feedDicts = feedDictsOrigin + return self + + # Specifies the RANDOM_MULTINOMIAL distribution tolerance. + # If set to greater than zero, the input is compared as log-probabilities + # to the output and must be within this tolerance to pass. + def WithMultinomialDistributionTolerance(self, expectedTolerance): + assert self.expectFailure is False + self.expectedMultinomialDistributionTolerance = expectedTolerance + return self + + # Specifies that this example is expected to fail during compilation or execution. + def ExpectFailure(self): + assert self.expectedMultinomialDistributionTolerance is None + self.expectFailure = True + return self + +class FileNames: + specFiles = [] + specNames = [] + modelFiles = [] + exampleFiles = [] + testFiles = [] + specFile = "" + specName = "" + modelFile = "" + exampleFile = "" + testFile = "" + version = "" + fileIndex = 0 + + @staticmethod + def InitializeFileLists(spec, model, example, test): + # get all spec files and target files + if os.path.isfile(spec): + FileNames.specFiles = [os.path.abspath(spec)] + elif os.path.isdir(spec): + FileNames.specFiles = sorted([os.path.abspath(os.path.join(spec, f)) + for f in os.listdir(spec) if f.endswith(".mod.py")]) + else: + assert False, "%s is neither a file or a directory"%spec + FileNames.specNames = [re.sub(r"\..*", "", os.path.basename(f)) + for f in FileNames.specFiles] + FileNames.modelFiles = FileNames.ParseTargetFiles(model, ".model.cpp") + FileNames.exampleFiles = FileNames.ParseTargetFiles(example, ".example.cpp") + FileNames.testFiles = FileNames.ParseTargetFiles(test, ".mod.py.cpp") + + @staticmethod + def ParseTargetFiles(arg, ext): + numFiles = len(FileNames.specFiles) + absPath = os.path.abspath(arg) + if os.path.isdir(arg): + target = [os.path.join(absPath, f + ext) for f in FileNames.specNames] + elif arg == "-": + target = ["-"] * numFiles + else: + target = [absPath] * numFiles + return target + + @staticmethod + def NextFile(): + if FileNames.fileIndex >= len(FileNames.specFiles): + return False + FileNames.specFile = FileNames.specFiles[FileNames.fileIndex] + FileNames.specName = FileNames.specNames[FileNames.fileIndex] + FileNames.modelFile = FileNames.modelFiles[FileNames.fileIndex] + FileNames.exampleFile = FileNames.exampleFiles[FileNames.fileIndex] + FileNames.testFile = FileNames.testFiles[FileNames.fileIndex] + FileNames.fileIndex += 1 + NamedObject.existingNames = set() + NamedVariable.existingNames = set() + NamedTest.existingNames = set() + Type.typesMap = dict() + Model.models = list() + Example.examples = list() + Configuration.use_shm_for_weights = False + + # Extract version from absolute file path. + versionMatch = re.findall(r"/V\d_\d/", FileNames.specFile) + if len(versionMatch) == 1: + FileNames.version = versionMatch[0].strip('/') + else: + FileNames.version = None + return True + +class Configuration: + use_shm_for_weights = False + force_regenerate = False + test_dynamic_output_shape = True + + @staticmethod + def useSHM(): + return Configuration.use_shm_for_weights |