Module furiosa.quantizer.furiosa_sdk_quantizer.frontend.onnx.spec.export_spec
Expand source code
from typing import List, Dict, Set, Callable, Tuple, Optional
import warnings
import onnx
from onnx import numpy_helper
from furiosa_sdk_quantizer.frontend.onnx.spec import spec_utils
from furiosa_sdk_quantizer.ir import spec
from furiosa_sdk_quantizer.ir.common.operator import HeightWidth, Padding
from furiosa_sdk_quantizer.interfaces.export_spec import ExportSpec
class OnnxExportSpec(ExportSpec):
def __init__(self, model: onnx.ModelProto):
super(OnnxExportSpec).__init__()
self.model = model
self.tensor_shapes = self.get_tensor_shapes(self.model)
self.initializer = {init.name: init for init in self.model.graph.initializer}
self.attributes = self.get_attributes(self.model)
# Build producer map
self.producer_map: Dict[str, onnx.NodeProto] = dict()
for node in self.model.graph.node:
for node_output in node.output:
if node_output in self.producer_map:
raise Exception(
"Invalid form of graph, a tensor {} has two or more producers.".format(node_output))
self.producer_map[node_output] = node
# Followings will be lazily initialized.
self._SKIP_NODE = None
self._MULTI_NODE_SPEC = None
self._SINGLE_NODE_SPEC = None
def export(self) -> (List[spec.OperatorSpec], Set[str]):
"""
Traverse graph and export nodes as specs.
Returns (a list of Spec, a set of unsupported ops)
"""
specs: List[spec.OperatorSpec] = list()
unsupported_ops = set()
outputs: List[str] = list(map(lambda output: output.name, self.model.graph.output))
# To prevent traversing cyclic connections
visited: Set[str] = set()
visited_node: List[onnx.NodeProto] = list()
while len(outputs) > 0:
output = outputs.pop(0)
if output not in self.producer_map:
continue
node = self.producer_map[output]
if node.op_type in self.skip_node:
outputs.append(node.input[0])
visited.update([node.input[0]])
continue
# prevent duplicate specs from being created from nodes that have multiple outputs like Split.
if node in visited_node:
continue
result = self.traverse_multi_node_spec(node)
if result is None:
result = self.traverse_single_node_spec(node)
# Failed to find how to process the node
if result is None:
unsupported_ops.add(node.op_type)
continue
s, inputs = result
# Put spec
specs.append(s)
# Put predecessor of node to new outputs
outputs += list(filter(lambda input: input not in visited, inputs))
visited.update(inputs)
visited_node.append(node)
return specs, unsupported_ops
def traverse_single_node_spec(self, node: onnx.NodeProto) -> Optional[Tuple[spec.Spec, List[str]]]:
"""
Returns (Spec, list of inputs of the node)
"""
if node.op_type not in self.single_node_spec:
return None
data_flow_input = list(filter(lambda input: input not in self.initializer.keys(), node.input))
return self.single_node_spec[node.op_type](node), data_flow_input
def traverse_multi_node_spec(self, node: onnx.NodeProto) -> Optional[Tuple[spec.Spec, List[str]]]:
"""
Returns (Spec, list of inputs of the node)
"""
if node.op_type not in self.multi_node_spec:
return None
found = None
for func in self.multi_node_spec[node.op_type]:
result = func(node)
if result is None:
continue
# Check the ambiguity
if found is not None:
warnings.warn(
"Find two or more ways of exporting as spec from multi-node for the the node {}, ".format(
node.op_type)
)
return found
found = result
return found
@property
def skip_node(self) -> Set[str]:
if self._SKIP_NODE is None:
self._SKIP_NODE = {'Relu', 'BatchNormalization'}
return self._SKIP_NODE
@property
def multi_node_spec(self) -> Dict[
str, List[Callable[[onnx.NodeProto], Optional[Tuple[spec.Spec, List[str]]]]]]:
if self._MULTI_NODE_SPEC is None:
self._MULTI_NODE_SPEC = {
'Div': [self.multi_node_lp_norm]
}
return self._MULTI_NODE_SPEC
@property
def single_node_spec(self) -> Dict[str, Callable[[onnx.NodeProto], spec.Spec]]:
if self._SINGLE_NODE_SPEC is not None:
return self._SINGLE_NODE_SPEC
self._SINGLE_NODE_SPEC = {
'Conv': self.conv2d,
'ConvTranspose': self.convtranspose2d,
'MaxPool': self.maxpool2d,
'AveragePool': self.avgpool2d,
'GlobalAveragePool': self.avgpool2d,
'Gemm': self.gemm,
'MatMul': self.matmul,
'DepthToSpace': self.depthtospace,
'Resize': self.resize,
'Add': self.add,
'Sub': self.sub,
'Mul': self.mul,
'Div': self.div,
'Exp': self.exp,
'Sigmoid': self.sigmoid,
'Softplus': self.softplus,
'Gelu': self.gelu,
'ReduceMean': self.reduce_mean,
'ReduceSum': self.reduce_sum,
'ReduceL2': self.reduce_l2,
'Squeeze': self.squeeze,
'Unsqueeze': self.unsqueeze,
'Reshape': self.reshape,
'Expand': self.expand,
'Concat': self.concatenation,
'Transpose': self.transpose,
'Slice': self.slice,
'Flatten': self.flatten,
'Pad': self.pad,
'Split': self.split,
'Softmax': self.softmax,
'Clip': self.clip,
'LayerNormalization': self.layer_norm,
'LpNormalization': self.lp_norm,
}
return self._SINGLE_NODE_SPEC
@staticmethod
def get_tensor_shapes(model: onnx.ModelProto) -> Dict[str, Tuple[int]]:
input_shapes = dict()
for vi in list(model.graph.input) + list(model.graph.output) + list(model.graph.value_info):
shape = [int(dim.dim_value) for dim in vi.type.tensor_type.shape.dim]
input_shapes[vi.name] = tuple(shape)
# include initializer's shape for this is also a node's input
for init in model.graph.initializer:
input_shapes[init.name] = numpy_helper.to_array(init).shape
return input_shapes
@staticmethod
def get_attributes(model: onnx.ModelProto) -> Dict[str, Dict[str, int or float]]:
attributes = dict()
for node in model.graph.node:
attrs = dict()
for attr in node.attribute:
if attr.type == 1:
attrs[attr.name] = attr.f
elif attr.type == 2:
attrs[attr.name] = attr.i
elif attr.type == 3:
attrs[attr.name] = attr.s.decode("utf-8")
elif attr.type == 7:
attrs[attr.name] = attr.ints
else:
raise Exception('Unknown data type: %s' % attr.type)
attributes[node.name] = attrs
return attributes
def get_inputs_for_gen_spec(self, node: onnx.NodeProto) \
-> Tuple[List[Tuple[int]], List[Tuple[int]], Dict]:
input_shapes = []
for input in node.input:
if input == '':
input_shapes.append([])
continue
input_shape = self.tensor_shapes[input]
input_shapes.append(input_shape)
if input in self.initializer.keys():
continue
assert input_shape, \
'input_shape: %s. shape_inference might have failed at %s' % (input_shape, node.name)
output_shapes = []
for output in node.output:
output_shape = self.tensor_shapes[output]
output_shapes.append(output_shape)
assert output_shape, \
'output_shape: %s. shape_inference might have failed at %s' % (output_shape, node.name)
attrs = self.attributes[node.name]
return input_shapes, output_shapes, attrs
def get_initializer_for_gen_spec(self, input_name: str) -> List[int] or List[float]:
return numpy_helper.to_array(self.initializer[input_name]).tolist()
def conv2d(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
output_shape = output_shapes[0]
# TODO assert -> warning. refer to https://docs.python.org/3/tutorial/errors.html#user-defined-exceptions
# ONNX Conv assumes n-d array as its kernel.
assert len(attributes['kernel_shape']) == 2
operator_spec_option = spec.Conv2d(
input=HeightWidth(input_shape[2], input_shape[3]),
kernel=HeightWidth(*attributes['kernel_shape']),
stride=HeightWidth(*attributes.get('strides', (1, 1))),
dilation=HeightWidth(*attributes.get('dilations', (1, 1))),
batch=input_shape[0],
input_channel=input_shape[1],
output_channel=output_shape[1],
groups=attributes.get('group', 1),
padding=Padding(*attributes.get('pads', (0, 0, 0, 0))),
)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def convtranspose2d(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
output_shape = output_shapes[0]
# TODO assert -> warning. refer to https://docs.python.org/3/tutorial/errors.html#user-defined-exceptions
# ONNX Conv assumes n-d array as its kernel.
assert len(attributes['kernel_shape']) == 2
operator_spec_option = spec.TrasnposeConv(
input=HeightWidth(input_shape[2], input_shape[3]),
kernel=HeightWidth(*attributes['kernel_shape']),
stride=HeightWidth(*attributes.get('strides', (1, 1))),
dilation=HeightWidth(*attributes.get('dilations', (1, 1))),
batch=input_shape[0],
input_channel=input_shape[1],
output_channel=output_shape[1],
groups=attributes.get('group', 1),
padding=Padding(*attributes.get('pads', (0, 0, 0, 0))),
)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def maxpool2d(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == len(output_shapes) == 1
input_shape = input_shapes[0]
output_shape = output_shapes[0]
# ONNX MaxPool assumes n-d array as its kernel.
assert len(attributes['kernel_shape']) == 2
operator_spec_option = spec.MaxPool2d(
input=HeightWidth(input_shape[2], input_shape[3]),
kernel=HeightWidth(*attributes['kernel_shape']),
stride=HeightWidth(*attributes.get('strides', (1, 1))),
dilation=HeightWidth(*attributes.get('dilations', (1, 1))),
batch=input_shape[0],
channel=output_shape[1],
padding=Padding(*attributes.get('pads', (0, 0, 0, 0))),
)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def avgpool2d(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == len(output_shapes) == 1
input_shape = input_shapes[0]
output_shape = output_shapes[0]
# ONNX AveragePool assumes n-d array as its kernel.
if node.op_type == 'AveragePool':
assert len(attributes['kernel_shape']) == 2
elif node.op_type == 'GlobalAveragePool':
attributes = {'kernel_shape': (input_shape[2:])}
operator_spec_option = spec.AveragePool2d(
input=HeightWidth(input_shape[2], input_shape[3]),
kernel=HeightWidth(*attributes['kernel_shape']),
stride=HeightWidth(*attributes.get('strides', (1, 1))),
dilation=HeightWidth(*attributes.get('dilations', (1, 1))),
batch=input_shape[0],
channel=output_shape[1],
padding=Padding(*attributes.get('pads', (0, 0, 0, 0))),
)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def gemm(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
alpha = attributes.get('alpha', float(1.0))
beta = attributes.get('beta', float(1.0))
m, k, n = spec_utils.gemm_shapes(input_shapes,
attributes.get('transA', int(0)),
attributes.get('transB', int(0)))
operator_spec_option = spec.Gemm(alpha=alpha, beta=beta, m=m, k=k, n=n)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def matmul(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 2
lhs_shape, rhs_shape = [*input_shapes[0]], [*input_shapes[1]]
operator_spec_option = spec.MatMul(lhs_shape=lhs_shape, rhs_shape=rhs_shape)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def depthtospace(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
mode = attributes.get('mode', 'DCR')
if mode == 'CRD':
mode = 'ColumnRowDepth'
elif mode == 'DCR':
mode = 'DepthColumnRow'
else:
raise Exception('Unknown mode: %s. Mode must be one of "DCR" or "CRD".' % mode)
operator_spec_option = spec.DepthToSpace(
batch=input_shape[0],
height=input_shape[2],
width=input_shape[3],
channel=input_shape[1],
block_size=attributes['blocksize'],
mode=mode
)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def resize(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
roi = self.get_initializer_for_gen_spec(node.input[1])
scales = self.get_initializer_for_gen_spec(node.input[2])
try:
sizes = self.get_initializer_for_gen_spec(node.input[3])
except IndexError:
sizes = []
operator_spec_option = spec.Resize(shape=[*input_shape], roi=roi, scales=scales, sizes=sizes)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def add(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
operator_spec_option = spec.Add(shape=[*input_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def sub(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
operator_spec_option = spec.Sub(shape=[*input_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def mul(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
operator_spec_option = spec.Mul(shape=[*input_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def div(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
operator_spec_option = spec.Div(shape=[*input_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def exp(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Exp(shape=[*input_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def sigmoid(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Sigmoid(shape=[*input_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def softplus(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Softplus(input_shape=[*input_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def gelu(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Gelu(shape=[*input_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def reduce_mean(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.ReduceMean(shape=[*input_shape],
axes=spec_utils.implicit_axis_to_explicit(
[*attributes['axes']],
input_shape))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def reduce_sum(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.ReduceSum(shape=[*input_shape],
axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']],
input_shape))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def reduce_l2(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.ReduceL2(shape=[*input_shape],
axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']],
input_shape))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def squeeze(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Squeeze(shape=[*input_shape],
axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']],
input_shape))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def unsqueeze(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Unsqueeze(shape=[*input_shape],
axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']],
input_shape))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def reshape(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, output_shapes, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
output_shape = output_shapes[0]
operator_spec_option = spec.Reshape(input_shape=[*input_shape],
output_shape=[*output_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def expand(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, output_shapes, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
output_shape = output_shapes[0]
operator_spec_option = spec.Expand(input_shape=[*input_shape],
output_shape=[*output_shape])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def concatenation(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
operator_spec_option = spec.Concatenation(tensors=list(map(list, input_shapes)),
axis=spec_utils.implicit_axis_to_explicit(
attributes['axis'],
input_shapes[0]))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def transpose(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Transpose(shape=[*input_shape],
permutation=spec_utils.implicit_axis_to_explicit(
[*attributes['perm']],
input_shape))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def slice(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
starts = self.get_initializer_for_gen_spec(node.input[1])
axes = self.get_initializer_for_gen_spec(node.input[3])
operator_spec_option = spec.Slice(shape=[*input_shape],
offset=spec_utils.slice_offset_dict(starts, axes, input_shape))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def flatten(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Flatten(shape=[*input_shape],
axis=spec_utils.implicit_axis_to_explicit(attributes['axis'],
input_shape))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def pad(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
assert len(input_shape) == 4
pads = self.get_initializer_for_gen_spec(node.input[1])
operator_spec_option = spec.Pad(shape=[*input_shape],
pad=spec_utils.horizontal_pads(*pads))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def layer_norm(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
operator_spec_option = spec.LayerNorm(input_shape=[*input_shapes[0]],
eps=attributes['epsilon'])
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def split(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Split(shape=[*input_shape], split=[*attributes['split']],
axis=spec_utils.implicit_axis_to_explicit(attributes.get('axis', 0),
input_shape))
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def softmax(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, attributes = self.get_inputs_for_gen_spec(node)
assert len(input_shapes) == 1
input_shape = input_shapes[0]
operator_spec_option = spec.Softmax(
input_shape=[*input_shape],
beta=attributes.get('beta', float(1.0)),
axis=spec_utils.implicit_axis_to_explicit(attributes['axis'], input_shape)
)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def clip(self, node: onnx.NodeProto) -> spec.Spec:
input_shapes, _, _ = self.get_inputs_for_gen_spec(node)
input_shape = input_shapes[0]
kwargs = {}
if node.attribute:
for attr in node.attribute:
if attr.name == "min":
kwargs['min'] = float(attr.f)
elif attr.name == "max":
kwargs['max'] = float(attr.f)
else:
assert len(node.input) == 3
for idx, node_input in enumerate(node.input):
if idx == 1:
try:
kwargs['min'] = float(numpy_helper.to_array(self.initializer[node_input]))
except KeyError:
kwargs['min'] = None
elif idx == 2:
try:
kwargs['max'] = float(numpy_helper.to_array(self.initializer[node_input]))
except KeyError:
kwargs['max'] = None
if not kwargs:
raise Exception('Empty min and/or max.')
operator_spec_option = spec.Clip(
input_shape=[*input_shape],
**kwargs
)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def lp_norm(self, node: onnx.NodeProto) -> spec.Spec:
input_shape, _, attrs = self.get_inputs_for_gen_spec(node)
operator_spec_option = spec.LpNorm(
input_shape=[*input_shape],
**attrs
)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def multi_node_lp_norm(self, node: onnx.NodeProto) -> Optional[Tuple[spec.Spec, List[str]]]:
"""
Starts from 'Div', traverse up to find the form of l2norm.
Returns all inputs of l2norm, consist of multi node
LpNormalization is not defined in ONNX Operator spec, so that we should traverse the graph:
Input --> ReduceL2 --> Clip --> Expand --> D
-----------------------------------> iv --> Output
"""
inputs_of_lp_norm: List[str] = []
for input in node.input:
# exclude input from initializer
if input not in self.producer_map:
continue
prev_node = self.producer_map[input]
if prev_node.op_type != 'Expand':
continue
pprev_node = self.producer_map[prev_node.input[0]]
if pprev_node.op_type != 'Clip':
continue
ppprev_node = self.producer_map[pprev_node.input[0]]
if ppprev_node.op_type != 'ReduceL2':
continue
p = 2
inputs_of_lp_norm.append(ppprev_node.input[0])
input_shapes, _, attributes = self.get_inputs_for_gen_spec(ppprev_node)
axis = attributes['axes'][0]
operator_spec_option = spec.LpNorm(input_shape=[*input_shapes[0]], p=p, axis=axis)
return spec.Spec(spec_utils.node_identifier(node), operator_spec_option), inputs_of_lp_norm
Classes
class OnnxExportSpec (model: onnx.onnx_ml_pb2.ModelProto)
-
Expand source code
class OnnxExportSpec(ExportSpec): def __init__(self, model: onnx.ModelProto): super(OnnxExportSpec).__init__() self.model = model self.tensor_shapes = self.get_tensor_shapes(self.model) self.initializer = {init.name: init for init in self.model.graph.initializer} self.attributes = self.get_attributes(self.model) # Build producer map self.producer_map: Dict[str, onnx.NodeProto] = dict() for node in self.model.graph.node: for node_output in node.output: if node_output in self.producer_map: raise Exception( "Invalid form of graph, a tensor {} has two or more producers.".format(node_output)) self.producer_map[node_output] = node # Followings will be lazily initialized. self._SKIP_NODE = None self._MULTI_NODE_SPEC = None self._SINGLE_NODE_SPEC = None def export(self) -> (List[spec.OperatorSpec], Set[str]): """ Traverse graph and export nodes as specs. Returns (a list of Spec, a set of unsupported ops) """ specs: List[spec.OperatorSpec] = list() unsupported_ops = set() outputs: List[str] = list(map(lambda output: output.name, self.model.graph.output)) # To prevent traversing cyclic connections visited: Set[str] = set() visited_node: List[onnx.NodeProto] = list() while len(outputs) > 0: output = outputs.pop(0) if output not in self.producer_map: continue node = self.producer_map[output] if node.op_type in self.skip_node: outputs.append(node.input[0]) visited.update([node.input[0]]) continue # prevent duplicate specs from being created from nodes that have multiple outputs like Split. if node in visited_node: continue result = self.traverse_multi_node_spec(node) if result is None: result = self.traverse_single_node_spec(node) # Failed to find how to process the node if result is None: unsupported_ops.add(node.op_type) continue s, inputs = result # Put spec specs.append(s) # Put predecessor of node to new outputs outputs += list(filter(lambda input: input not in visited, inputs)) visited.update(inputs) visited_node.append(node) return specs, unsupported_ops def traverse_single_node_spec(self, node: onnx.NodeProto) -> Optional[Tuple[spec.Spec, List[str]]]: """ Returns (Spec, list of inputs of the node) """ if node.op_type not in self.single_node_spec: return None data_flow_input = list(filter(lambda input: input not in self.initializer.keys(), node.input)) return self.single_node_spec[node.op_type](node), data_flow_input def traverse_multi_node_spec(self, node: onnx.NodeProto) -> Optional[Tuple[spec.Spec, List[str]]]: """ Returns (Spec, list of inputs of the node) """ if node.op_type not in self.multi_node_spec: return None found = None for func in self.multi_node_spec[node.op_type]: result = func(node) if result is None: continue # Check the ambiguity if found is not None: warnings.warn( "Find two or more ways of exporting as spec from multi-node for the the node {}, ".format( node.op_type) ) return found found = result return found @property def skip_node(self) -> Set[str]: if self._SKIP_NODE is None: self._SKIP_NODE = {'Relu', 'BatchNormalization'} return self._SKIP_NODE @property def multi_node_spec(self) -> Dict[ str, List[Callable[[onnx.NodeProto], Optional[Tuple[spec.Spec, List[str]]]]]]: if self._MULTI_NODE_SPEC is None: self._MULTI_NODE_SPEC = { 'Div': [self.multi_node_lp_norm] } return self._MULTI_NODE_SPEC @property def single_node_spec(self) -> Dict[str, Callable[[onnx.NodeProto], spec.Spec]]: if self._SINGLE_NODE_SPEC is not None: return self._SINGLE_NODE_SPEC self._SINGLE_NODE_SPEC = { 'Conv': self.conv2d, 'ConvTranspose': self.convtranspose2d, 'MaxPool': self.maxpool2d, 'AveragePool': self.avgpool2d, 'GlobalAveragePool': self.avgpool2d, 'Gemm': self.gemm, 'MatMul': self.matmul, 'DepthToSpace': self.depthtospace, 'Resize': self.resize, 'Add': self.add, 'Sub': self.sub, 'Mul': self.mul, 'Div': self.div, 'Exp': self.exp, 'Sigmoid': self.sigmoid, 'Softplus': self.softplus, 'Gelu': self.gelu, 'ReduceMean': self.reduce_mean, 'ReduceSum': self.reduce_sum, 'ReduceL2': self.reduce_l2, 'Squeeze': self.squeeze, 'Unsqueeze': self.unsqueeze, 'Reshape': self.reshape, 'Expand': self.expand, 'Concat': self.concatenation, 'Transpose': self.transpose, 'Slice': self.slice, 'Flatten': self.flatten, 'Pad': self.pad, 'Split': self.split, 'Softmax': self.softmax, 'Clip': self.clip, 'LayerNormalization': self.layer_norm, 'LpNormalization': self.lp_norm, } return self._SINGLE_NODE_SPEC @staticmethod def get_tensor_shapes(model: onnx.ModelProto) -> Dict[str, Tuple[int]]: input_shapes = dict() for vi in list(model.graph.input) + list(model.graph.output) + list(model.graph.value_info): shape = [int(dim.dim_value) for dim in vi.type.tensor_type.shape.dim] input_shapes[vi.name] = tuple(shape) # include initializer's shape for this is also a node's input for init in model.graph.initializer: input_shapes[init.name] = numpy_helper.to_array(init).shape return input_shapes @staticmethod def get_attributes(model: onnx.ModelProto) -> Dict[str, Dict[str, int or float]]: attributes = dict() for node in model.graph.node: attrs = dict() for attr in node.attribute: if attr.type == 1: attrs[attr.name] = attr.f elif attr.type == 2: attrs[attr.name] = attr.i elif attr.type == 3: attrs[attr.name] = attr.s.decode("utf-8") elif attr.type == 7: attrs[attr.name] = attr.ints else: raise Exception('Unknown data type: %s' % attr.type) attributes[node.name] = attrs return attributes def get_inputs_for_gen_spec(self, node: onnx.NodeProto) \ -> Tuple[List[Tuple[int]], List[Tuple[int]], Dict]: input_shapes = [] for input in node.input: if input == '': input_shapes.append([]) continue input_shape = self.tensor_shapes[input] input_shapes.append(input_shape) if input in self.initializer.keys(): continue assert input_shape, \ 'input_shape: %s. shape_inference might have failed at %s' % (input_shape, node.name) output_shapes = [] for output in node.output: output_shape = self.tensor_shapes[output] output_shapes.append(output_shape) assert output_shape, \ 'output_shape: %s. shape_inference might have failed at %s' % (output_shape, node.name) attrs = self.attributes[node.name] return input_shapes, output_shapes, attrs def get_initializer_for_gen_spec(self, input_name: str) -> List[int] or List[float]: return numpy_helper.to_array(self.initializer[input_name]).tolist() def conv2d(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] output_shape = output_shapes[0] # TODO assert -> warning. refer to https://docs.python.org/3/tutorial/errors.html#user-defined-exceptions # ONNX Conv assumes n-d array as its kernel. assert len(attributes['kernel_shape']) == 2 operator_spec_option = spec.Conv2d( input=HeightWidth(input_shape[2], input_shape[3]), kernel=HeightWidth(*attributes['kernel_shape']), stride=HeightWidth(*attributes.get('strides', (1, 1))), dilation=HeightWidth(*attributes.get('dilations', (1, 1))), batch=input_shape[0], input_channel=input_shape[1], output_channel=output_shape[1], groups=attributes.get('group', 1), padding=Padding(*attributes.get('pads', (0, 0, 0, 0))), ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def convtranspose2d(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] output_shape = output_shapes[0] # TODO assert -> warning. refer to https://docs.python.org/3/tutorial/errors.html#user-defined-exceptions # ONNX Conv assumes n-d array as its kernel. assert len(attributes['kernel_shape']) == 2 operator_spec_option = spec.TrasnposeConv( input=HeightWidth(input_shape[2], input_shape[3]), kernel=HeightWidth(*attributes['kernel_shape']), stride=HeightWidth(*attributes.get('strides', (1, 1))), dilation=HeightWidth(*attributes.get('dilations', (1, 1))), batch=input_shape[0], input_channel=input_shape[1], output_channel=output_shape[1], groups=attributes.get('group', 1), padding=Padding(*attributes.get('pads', (0, 0, 0, 0))), ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def maxpool2d(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == len(output_shapes) == 1 input_shape = input_shapes[0] output_shape = output_shapes[0] # ONNX MaxPool assumes n-d array as its kernel. assert len(attributes['kernel_shape']) == 2 operator_spec_option = spec.MaxPool2d( input=HeightWidth(input_shape[2], input_shape[3]), kernel=HeightWidth(*attributes['kernel_shape']), stride=HeightWidth(*attributes.get('strides', (1, 1))), dilation=HeightWidth(*attributes.get('dilations', (1, 1))), batch=input_shape[0], channel=output_shape[1], padding=Padding(*attributes.get('pads', (0, 0, 0, 0))), ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def avgpool2d(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == len(output_shapes) == 1 input_shape = input_shapes[0] output_shape = output_shapes[0] # ONNX AveragePool assumes n-d array as its kernel. if node.op_type == 'AveragePool': assert len(attributes['kernel_shape']) == 2 elif node.op_type == 'GlobalAveragePool': attributes = {'kernel_shape': (input_shape[2:])} operator_spec_option = spec.AveragePool2d( input=HeightWidth(input_shape[2], input_shape[3]), kernel=HeightWidth(*attributes['kernel_shape']), stride=HeightWidth(*attributes.get('strides', (1, 1))), dilation=HeightWidth(*attributes.get('dilations', (1, 1))), batch=input_shape[0], channel=output_shape[1], padding=Padding(*attributes.get('pads', (0, 0, 0, 0))), ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def gemm(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) alpha = attributes.get('alpha', float(1.0)) beta = attributes.get('beta', float(1.0)) m, k, n = spec_utils.gemm_shapes(input_shapes, attributes.get('transA', int(0)), attributes.get('transB', int(0))) operator_spec_option = spec.Gemm(alpha=alpha, beta=beta, m=m, k=k, n=n) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def matmul(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 2 lhs_shape, rhs_shape = [*input_shapes[0]], [*input_shapes[1]] operator_spec_option = spec.MatMul(lhs_shape=lhs_shape, rhs_shape=rhs_shape) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def depthtospace(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] mode = attributes.get('mode', 'DCR') if mode == 'CRD': mode = 'ColumnRowDepth' elif mode == 'DCR': mode = 'DepthColumnRow' else: raise Exception('Unknown mode: %s. Mode must be one of "DCR" or "CRD".' % mode) operator_spec_option = spec.DepthToSpace( batch=input_shape[0], height=input_shape[2], width=input_shape[3], channel=input_shape[1], block_size=attributes['blocksize'], mode=mode ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def resize(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] roi = self.get_initializer_for_gen_spec(node.input[1]) scales = self.get_initializer_for_gen_spec(node.input[2]) try: sizes = self.get_initializer_for_gen_spec(node.input[3]) except IndexError: sizes = [] operator_spec_option = spec.Resize(shape=[*input_shape], roi=roi, scales=scales, sizes=sizes) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def add(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] operator_spec_option = spec.Add(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def sub(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] operator_spec_option = spec.Sub(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def mul(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] operator_spec_option = spec.Mul(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def div(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] operator_spec_option = spec.Div(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def exp(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Exp(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def sigmoid(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Sigmoid(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def softplus(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Softplus(input_shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def gelu(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Gelu(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def reduce_mean(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.ReduceMean(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit( [*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def reduce_sum(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.ReduceSum(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def reduce_l2(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.ReduceL2(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def squeeze(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Squeeze(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def unsqueeze(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Unsqueeze(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def reshape(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] output_shape = output_shapes[0] operator_spec_option = spec.Reshape(input_shape=[*input_shape], output_shape=[*output_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def expand(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] output_shape = output_shapes[0] operator_spec_option = spec.Expand(input_shape=[*input_shape], output_shape=[*output_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def concatenation(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) operator_spec_option = spec.Concatenation(tensors=list(map(list, input_shapes)), axis=spec_utils.implicit_axis_to_explicit( attributes['axis'], input_shapes[0])) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def transpose(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Transpose(shape=[*input_shape], permutation=spec_utils.implicit_axis_to_explicit( [*attributes['perm']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def slice(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] starts = self.get_initializer_for_gen_spec(node.input[1]) axes = self.get_initializer_for_gen_spec(node.input[3]) operator_spec_option = spec.Slice(shape=[*input_shape], offset=spec_utils.slice_offset_dict(starts, axes, input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def flatten(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Flatten(shape=[*input_shape], axis=spec_utils.implicit_axis_to_explicit(attributes['axis'], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def pad(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] assert len(input_shape) == 4 pads = self.get_initializer_for_gen_spec(node.input[1]) operator_spec_option = spec.Pad(shape=[*input_shape], pad=spec_utils.horizontal_pads(*pads)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def layer_norm(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) operator_spec_option = spec.LayerNorm(input_shape=[*input_shapes[0]], eps=attributes['epsilon']) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def split(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Split(shape=[*input_shape], split=[*attributes['split']], axis=spec_utils.implicit_axis_to_explicit(attributes.get('axis', 0), input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def softmax(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Softmax( input_shape=[*input_shape], beta=attributes.get('beta', float(1.0)), axis=spec_utils.implicit_axis_to_explicit(attributes['axis'], input_shape) ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def clip(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] kwargs = {} if node.attribute: for attr in node.attribute: if attr.name == "min": kwargs['min'] = float(attr.f) elif attr.name == "max": kwargs['max'] = float(attr.f) else: assert len(node.input) == 3 for idx, node_input in enumerate(node.input): if idx == 1: try: kwargs['min'] = float(numpy_helper.to_array(self.initializer[node_input])) except KeyError: kwargs['min'] = None elif idx == 2: try: kwargs['max'] = float(numpy_helper.to_array(self.initializer[node_input])) except KeyError: kwargs['max'] = None if not kwargs: raise Exception('Empty min and/or max.') operator_spec_option = spec.Clip( input_shape=[*input_shape], **kwargs ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def lp_norm(self, node: onnx.NodeProto) -> spec.Spec: input_shape, _, attrs = self.get_inputs_for_gen_spec(node) operator_spec_option = spec.LpNorm( input_shape=[*input_shape], **attrs ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option) def multi_node_lp_norm(self, node: onnx.NodeProto) -> Optional[Tuple[spec.Spec, List[str]]]: """ Starts from 'Div', traverse up to find the form of l2norm. Returns all inputs of l2norm, consist of multi node LpNormalization is not defined in ONNX Operator spec, so that we should traverse the graph: Input --> ReduceL2 --> Clip --> Expand --> D -----------------------------------> iv --> Output """ inputs_of_lp_norm: List[str] = [] for input in node.input: # exclude input from initializer if input not in self.producer_map: continue prev_node = self.producer_map[input] if prev_node.op_type != 'Expand': continue pprev_node = self.producer_map[prev_node.input[0]] if pprev_node.op_type != 'Clip': continue ppprev_node = self.producer_map[pprev_node.input[0]] if ppprev_node.op_type != 'ReduceL2': continue p = 2 inputs_of_lp_norm.append(ppprev_node.input[0]) input_shapes, _, attributes = self.get_inputs_for_gen_spec(ppprev_node) axis = attributes['axes'][0] operator_spec_option = spec.LpNorm(input_shape=[*input_shapes[0]], p=p, axis=axis) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option), inputs_of_lp_norm
Ancestors
- furiosa_sdk_quantizer.interfaces.export_spec.ExportSpec
Static methods
def get_attributes(model: onnx.onnx_ml_pb2.ModelProto) ‑> Dict[str, Dict[str, int]]
-
Expand source code
@staticmethod def get_attributes(model: onnx.ModelProto) -> Dict[str, Dict[str, int or float]]: attributes = dict() for node in model.graph.node: attrs = dict() for attr in node.attribute: if attr.type == 1: attrs[attr.name] = attr.f elif attr.type == 2: attrs[attr.name] = attr.i elif attr.type == 3: attrs[attr.name] = attr.s.decode("utf-8") elif attr.type == 7: attrs[attr.name] = attr.ints else: raise Exception('Unknown data type: %s' % attr.type) attributes[node.name] = attrs return attributes
def get_tensor_shapes(model: onnx.onnx_ml_pb2.ModelProto) ‑> Dict[str, Tuple[int]]
-
Expand source code
@staticmethod def get_tensor_shapes(model: onnx.ModelProto) -> Dict[str, Tuple[int]]: input_shapes = dict() for vi in list(model.graph.input) + list(model.graph.output) + list(model.graph.value_info): shape = [int(dim.dim_value) for dim in vi.type.tensor_type.shape.dim] input_shapes[vi.name] = tuple(shape) # include initializer's shape for this is also a node's input for init in model.graph.initializer: input_shapes[init.name] = numpy_helper.to_array(init).shape return input_shapes
Instance variables
var multi_node_spec : Dict[str, List[Callable[[onnx.onnx_ml_pb2.NodeProto], Union[Tuple[furiosa_sdk_quantizer.ir.spec.Spec, List[str]], NoneType]]]]
-
Expand source code
@property def multi_node_spec(self) -> Dict[ str, List[Callable[[onnx.NodeProto], Optional[Tuple[spec.Spec, List[str]]]]]]: if self._MULTI_NODE_SPEC is None: self._MULTI_NODE_SPEC = { 'Div': [self.multi_node_lp_norm] } return self._MULTI_NODE_SPEC
var single_node_spec : Dict[str, Callable[[onnx.onnx_ml_pb2.NodeProto], furiosa_sdk_quantizer.ir.spec.Spec]]
-
Expand source code
@property def single_node_spec(self) -> Dict[str, Callable[[onnx.NodeProto], spec.Spec]]: if self._SINGLE_NODE_SPEC is not None: return self._SINGLE_NODE_SPEC self._SINGLE_NODE_SPEC = { 'Conv': self.conv2d, 'ConvTranspose': self.convtranspose2d, 'MaxPool': self.maxpool2d, 'AveragePool': self.avgpool2d, 'GlobalAveragePool': self.avgpool2d, 'Gemm': self.gemm, 'MatMul': self.matmul, 'DepthToSpace': self.depthtospace, 'Resize': self.resize, 'Add': self.add, 'Sub': self.sub, 'Mul': self.mul, 'Div': self.div, 'Exp': self.exp, 'Sigmoid': self.sigmoid, 'Softplus': self.softplus, 'Gelu': self.gelu, 'ReduceMean': self.reduce_mean, 'ReduceSum': self.reduce_sum, 'ReduceL2': self.reduce_l2, 'Squeeze': self.squeeze, 'Unsqueeze': self.unsqueeze, 'Reshape': self.reshape, 'Expand': self.expand, 'Concat': self.concatenation, 'Transpose': self.transpose, 'Slice': self.slice, 'Flatten': self.flatten, 'Pad': self.pad, 'Split': self.split, 'Softmax': self.softmax, 'Clip': self.clip, 'LayerNormalization': self.layer_norm, 'LpNormalization': self.lp_norm, } return self._SINGLE_NODE_SPEC
var skip_node : Set[str]
-
Expand source code
@property def skip_node(self) -> Set[str]: if self._SKIP_NODE is None: self._SKIP_NODE = {'Relu', 'BatchNormalization'} return self._SKIP_NODE
Methods
def add(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def add(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] operator_spec_option = spec.Add(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def avgpool2d(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def avgpool2d(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == len(output_shapes) == 1 input_shape = input_shapes[0] output_shape = output_shapes[0] # ONNX AveragePool assumes n-d array as its kernel. if node.op_type == 'AveragePool': assert len(attributes['kernel_shape']) == 2 elif node.op_type == 'GlobalAveragePool': attributes = {'kernel_shape': (input_shape[2:])} operator_spec_option = spec.AveragePool2d( input=HeightWidth(input_shape[2], input_shape[3]), kernel=HeightWidth(*attributes['kernel_shape']), stride=HeightWidth(*attributes.get('strides', (1, 1))), dilation=HeightWidth(*attributes.get('dilations', (1, 1))), batch=input_shape[0], channel=output_shape[1], padding=Padding(*attributes.get('pads', (0, 0, 0, 0))), ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def clip(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def clip(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] kwargs = {} if node.attribute: for attr in node.attribute: if attr.name == "min": kwargs['min'] = float(attr.f) elif attr.name == "max": kwargs['max'] = float(attr.f) else: assert len(node.input) == 3 for idx, node_input in enumerate(node.input): if idx == 1: try: kwargs['min'] = float(numpy_helper.to_array(self.initializer[node_input])) except KeyError: kwargs['min'] = None elif idx == 2: try: kwargs['max'] = float(numpy_helper.to_array(self.initializer[node_input])) except KeyError: kwargs['max'] = None if not kwargs: raise Exception('Empty min and/or max.') operator_spec_option = spec.Clip( input_shape=[*input_shape], **kwargs ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def concatenation(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def concatenation(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) operator_spec_option = spec.Concatenation(tensors=list(map(list, input_shapes)), axis=spec_utils.implicit_axis_to_explicit( attributes['axis'], input_shapes[0])) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def conv2d(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def conv2d(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] output_shape = output_shapes[0] # TODO assert -> warning. refer to https://docs.python.org/3/tutorial/errors.html#user-defined-exceptions # ONNX Conv assumes n-d array as its kernel. assert len(attributes['kernel_shape']) == 2 operator_spec_option = spec.Conv2d( input=HeightWidth(input_shape[2], input_shape[3]), kernel=HeightWidth(*attributes['kernel_shape']), stride=HeightWidth(*attributes.get('strides', (1, 1))), dilation=HeightWidth(*attributes.get('dilations', (1, 1))), batch=input_shape[0], input_channel=input_shape[1], output_channel=output_shape[1], groups=attributes.get('group', 1), padding=Padding(*attributes.get('pads', (0, 0, 0, 0))), ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def convtranspose2d(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def convtranspose2d(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] output_shape = output_shapes[0] # TODO assert -> warning. refer to https://docs.python.org/3/tutorial/errors.html#user-defined-exceptions # ONNX Conv assumes n-d array as its kernel. assert len(attributes['kernel_shape']) == 2 operator_spec_option = spec.TrasnposeConv( input=HeightWidth(input_shape[2], input_shape[3]), kernel=HeightWidth(*attributes['kernel_shape']), stride=HeightWidth(*attributes.get('strides', (1, 1))), dilation=HeightWidth(*attributes.get('dilations', (1, 1))), batch=input_shape[0], input_channel=input_shape[1], output_channel=output_shape[1], groups=attributes.get('group', 1), padding=Padding(*attributes.get('pads', (0, 0, 0, 0))), ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def depthtospace(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def depthtospace(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] mode = attributes.get('mode', 'DCR') if mode == 'CRD': mode = 'ColumnRowDepth' elif mode == 'DCR': mode = 'DepthColumnRow' else: raise Exception('Unknown mode: %s. Mode must be one of "DCR" or "CRD".' % mode) operator_spec_option = spec.DepthToSpace( batch=input_shape[0], height=input_shape[2], width=input_shape[3], channel=input_shape[1], block_size=attributes['blocksize'], mode=mode ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def div(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def div(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] operator_spec_option = spec.Div(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def exp(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def exp(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Exp(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def expand(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def expand(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] output_shape = output_shapes[0] operator_spec_option = spec.Expand(input_shape=[*input_shape], output_shape=[*output_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def export(self) ‑> (typing.List[furiosa_sdk_quantizer.ir.spec.OperatorSpec], typing.Set[str])
-
Traverse graph and export nodes as specs. Returns (a list of Spec, a set of unsupported ops)
Expand source code
def export(self) -> (List[spec.OperatorSpec], Set[str]): """ Traverse graph and export nodes as specs. Returns (a list of Spec, a set of unsupported ops) """ specs: List[spec.OperatorSpec] = list() unsupported_ops = set() outputs: List[str] = list(map(lambda output: output.name, self.model.graph.output)) # To prevent traversing cyclic connections visited: Set[str] = set() visited_node: List[onnx.NodeProto] = list() while len(outputs) > 0: output = outputs.pop(0) if output not in self.producer_map: continue node = self.producer_map[output] if node.op_type in self.skip_node: outputs.append(node.input[0]) visited.update([node.input[0]]) continue # prevent duplicate specs from being created from nodes that have multiple outputs like Split. if node in visited_node: continue result = self.traverse_multi_node_spec(node) if result is None: result = self.traverse_single_node_spec(node) # Failed to find how to process the node if result is None: unsupported_ops.add(node.op_type) continue s, inputs = result # Put spec specs.append(s) # Put predecessor of node to new outputs outputs += list(filter(lambda input: input not in visited, inputs)) visited.update(inputs) visited_node.append(node) return specs, unsupported_ops
def flatten(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def flatten(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Flatten(shape=[*input_shape], axis=spec_utils.implicit_axis_to_explicit(attributes['axis'], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def gelu(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def gelu(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Gelu(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def gemm(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def gemm(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) alpha = attributes.get('alpha', float(1.0)) beta = attributes.get('beta', float(1.0)) m, k, n = spec_utils.gemm_shapes(input_shapes, attributes.get('transA', int(0)), attributes.get('transB', int(0))) operator_spec_option = spec.Gemm(alpha=alpha, beta=beta, m=m, k=k, n=n) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def get_initializer_for_gen_spec(self, input_name: str) ‑> List[int]
-
Expand source code
def get_initializer_for_gen_spec(self, input_name: str) -> List[int] or List[float]: return numpy_helper.to_array(self.initializer[input_name]).tolist()
def get_inputs_for_gen_spec(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> Tuple[List[Tuple[int]], List[Tuple[int]], Dict]
-
Expand source code
def get_inputs_for_gen_spec(self, node: onnx.NodeProto) \ -> Tuple[List[Tuple[int]], List[Tuple[int]], Dict]: input_shapes = [] for input in node.input: if input == '': input_shapes.append([]) continue input_shape = self.tensor_shapes[input] input_shapes.append(input_shape) if input in self.initializer.keys(): continue assert input_shape, \ 'input_shape: %s. shape_inference might have failed at %s' % (input_shape, node.name) output_shapes = [] for output in node.output: output_shape = self.tensor_shapes[output] output_shapes.append(output_shape) assert output_shape, \ 'output_shape: %s. shape_inference might have failed at %s' % (output_shape, node.name) attrs = self.attributes[node.name] return input_shapes, output_shapes, attrs
def layer_norm(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def layer_norm(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) operator_spec_option = spec.LayerNorm(input_shape=[*input_shapes[0]], eps=attributes['epsilon']) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def lp_norm(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def lp_norm(self, node: onnx.NodeProto) -> spec.Spec: input_shape, _, attrs = self.get_inputs_for_gen_spec(node) operator_spec_option = spec.LpNorm( input_shape=[*input_shape], **attrs ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def matmul(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def matmul(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 2 lhs_shape, rhs_shape = [*input_shapes[0]], [*input_shapes[1]] operator_spec_option = spec.MatMul(lhs_shape=lhs_shape, rhs_shape=rhs_shape) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def maxpool2d(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def maxpool2d(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == len(output_shapes) == 1 input_shape = input_shapes[0] output_shape = output_shapes[0] # ONNX MaxPool assumes n-d array as its kernel. assert len(attributes['kernel_shape']) == 2 operator_spec_option = spec.MaxPool2d( input=HeightWidth(input_shape[2], input_shape[3]), kernel=HeightWidth(*attributes['kernel_shape']), stride=HeightWidth(*attributes.get('strides', (1, 1))), dilation=HeightWidth(*attributes.get('dilations', (1, 1))), batch=input_shape[0], channel=output_shape[1], padding=Padding(*attributes.get('pads', (0, 0, 0, 0))), ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def mul(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def mul(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] operator_spec_option = spec.Mul(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def multi_node_lp_norm(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> Union[Tuple[furiosa_sdk_quantizer.ir.spec.Spec, List[str]], NoneType]
-
Starts from 'Div', traverse up to find the form of l2norm. Returns all inputs of l2norm, consist of multi node
LpNormalization is not defined in ONNX Operator spec, so that we should traverse the graph:
Input –> ReduceL2 –> Clip –> Expand –> D -----------------------------------> iv –> Output
Expand source code
def multi_node_lp_norm(self, node: onnx.NodeProto) -> Optional[Tuple[spec.Spec, List[str]]]: """ Starts from 'Div', traverse up to find the form of l2norm. Returns all inputs of l2norm, consist of multi node LpNormalization is not defined in ONNX Operator spec, so that we should traverse the graph: Input --> ReduceL2 --> Clip --> Expand --> D -----------------------------------> iv --> Output """ inputs_of_lp_norm: List[str] = [] for input in node.input: # exclude input from initializer if input not in self.producer_map: continue prev_node = self.producer_map[input] if prev_node.op_type != 'Expand': continue pprev_node = self.producer_map[prev_node.input[0]] if pprev_node.op_type != 'Clip': continue ppprev_node = self.producer_map[pprev_node.input[0]] if ppprev_node.op_type != 'ReduceL2': continue p = 2 inputs_of_lp_norm.append(ppprev_node.input[0]) input_shapes, _, attributes = self.get_inputs_for_gen_spec(ppprev_node) axis = attributes['axes'][0] operator_spec_option = spec.LpNorm(input_shape=[*input_shapes[0]], p=p, axis=axis) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option), inputs_of_lp_norm
def pad(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def pad(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] assert len(input_shape) == 4 pads = self.get_initializer_for_gen_spec(node.input[1]) operator_spec_option = spec.Pad(shape=[*input_shape], pad=spec_utils.horizontal_pads(*pads)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def reduce_l2(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def reduce_l2(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.ReduceL2(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def reduce_mean(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def reduce_mean(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.ReduceMean(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit( [*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def reduce_sum(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def reduce_sum(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.ReduceSum(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def reshape(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def reshape(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, output_shapes, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] output_shape = output_shapes[0] operator_spec_option = spec.Reshape(input_shape=[*input_shape], output_shape=[*output_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def resize(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def resize(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] roi = self.get_initializer_for_gen_spec(node.input[1]) scales = self.get_initializer_for_gen_spec(node.input[2]) try: sizes = self.get_initializer_for_gen_spec(node.input[3]) except IndexError: sizes = [] operator_spec_option = spec.Resize(shape=[*input_shape], roi=roi, scales=scales, sizes=sizes) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def sigmoid(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def sigmoid(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Sigmoid(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def slice(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def slice(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] starts = self.get_initializer_for_gen_spec(node.input[1]) axes = self.get_initializer_for_gen_spec(node.input[3]) operator_spec_option = spec.Slice(shape=[*input_shape], offset=spec_utils.slice_offset_dict(starts, axes, input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def softmax(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def softmax(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Softmax( input_shape=[*input_shape], beta=attributes.get('beta', float(1.0)), axis=spec_utils.implicit_axis_to_explicit(attributes['axis'], input_shape) ) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def softplus(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def softplus(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Softplus(input_shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def split(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def split(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Split(shape=[*input_shape], split=[*attributes['split']], axis=spec_utils.implicit_axis_to_explicit(attributes.get('axis', 0), input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def squeeze(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def squeeze(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Squeeze(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def sub(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def sub(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, _ = self.get_inputs_for_gen_spec(node) input_shape = input_shapes[0] operator_spec_option = spec.Sub(shape=[*input_shape]) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def transpose(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def transpose(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Transpose(shape=[*input_shape], permutation=spec_utils.implicit_axis_to_explicit( [*attributes['perm']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)
def traverse_multi_node_spec(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> Union[Tuple[furiosa_sdk_quantizer.ir.spec.Spec, List[str]], NoneType]
-
Returns (Spec, list of inputs of the node)
Expand source code
def traverse_multi_node_spec(self, node: onnx.NodeProto) -> Optional[Tuple[spec.Spec, List[str]]]: """ Returns (Spec, list of inputs of the node) """ if node.op_type not in self.multi_node_spec: return None found = None for func in self.multi_node_spec[node.op_type]: result = func(node) if result is None: continue # Check the ambiguity if found is not None: warnings.warn( "Find two or more ways of exporting as spec from multi-node for the the node {}, ".format( node.op_type) ) return found found = result return found
def traverse_single_node_spec(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> Union[Tuple[furiosa_sdk_quantizer.ir.spec.Spec, List[str]], NoneType]
-
Returns (Spec, list of inputs of the node)
Expand source code
def traverse_single_node_spec(self, node: onnx.NodeProto) -> Optional[Tuple[spec.Spec, List[str]]]: """ Returns (Spec, list of inputs of the node) """ if node.op_type not in self.single_node_spec: return None data_flow_input = list(filter(lambda input: input not in self.initializer.keys(), node.input)) return self.single_node_spec[node.op_type](node), data_flow_input
def unsqueeze(self, node: onnx.onnx_ml_pb2.NodeProto) ‑> furiosa_sdk_quantizer.ir.spec.Spec
-
Expand source code
def unsqueeze(self, node: onnx.NodeProto) -> spec.Spec: input_shapes, _, attributes = self.get_inputs_for_gen_spec(node) assert len(input_shapes) == 1 input_shape = input_shapes[0] operator_spec_option = spec.Unsqueeze(shape=[*input_shape], axes=spec_utils.implicit_axis_to_explicit([*attributes['axes']], input_shape)) return spec.Spec(spec_utils.node_identifier(node), operator_spec_option)