Module furiosa.quantizer.frontend.onnx.transformer.fuse_pad
Expand source code
import abc
import onnx
from furiosa_sdk_quantizer.interfaces.transformer import Transformer
from furiosa_sdk_quantizer.frontend.onnx.transformer import ONNXTransformer
class FusePad(Transformer):
def transform(self, model: onnx.ModelProto) -> onnx.ModelProto:
for transformer in [
Pattern_1,
Pattern_2
]:
model = transformer(model).transform()
return model
class Pattern_1(ONNXTransformer, abc.ABC):
"""
transform
prev --> Pad --> MaxPool --> next
to
prev --> MaxPool --> next
if 1. Pad.mode == 'constant'
2. Pad.constant_value == -inf
3. padded on spatial dimension
4. fused_pads[i] < kernel_shape[i] and fused_pads[i + kernel_rank] < kernel_shape[i] for all i
"""
pattern_to_match = ['Pad', 'MaxPool']
def pattern_matching(self, base_node):
inputs = base_node.input
matched_nodes = self.pattern_matcher(base_node, self.pattern_to_match)
if not matched_nodes:
return inputs
if not self.pattern_condition_checker(matched_nodes):
return inputs
self.transform_to_fuse(matched_nodes,
nodes_to_add=[
self.make_new_node(matched_nodes)
])
return matched_nodes[0].input
def pattern_condition_checker(self, nodes_to_check):
top_node, base_node = nodes_to_check
if not self.check_condition_1(top_node.attribute):
return False
if not self.check_condition_2(top_node):
return False
if not self.check_condition_3(top_node.input[1]):
return False
if not self.check_condition_6(self.get_attrs(base_node), top_node.input[1]):
return False
return True
def check_condition_1(self, node_attr):
if self.get_pad_mode(node_attr) == 'constant':
return True
return False
def check_condition_2(self, node):
try:
const_input = node.input[2]
constant_value = self.get_initializer_array(const_input)
except IndexError:
constant_value = 0.0
if constant_value == float('-inf'):
return True
return False
def check_condition_3(self, pads_input):
pads = self.get_initializer_array(pads_input)
rank = len(pads) // 2
pads_on_nc_dim = [*pads[:2], *pads[rank:rank + 2]]
if all(pad == 0 for pad in pads_on_nc_dim):
return True
return False
def check_condition_6(self, node_attrs, pad_input):
attrs = self.update_attrs(node_attrs, pad_input)
kernel_shape = attrs['kernel_shape']
kernel_rank = len(kernel_shape)
fused_pads = attrs['pads']
fused_pad_shape = [sum([fused_pads[dim], fused_pads[dim + kernel_rank]]) for dim in range(kernel_rank)]
assert len(kernel_shape) == len(fused_pad_shape)
if all(fused_pads[dim] < k and fused_pads[dim + kernel_rank] < k
for dim, k in enumerate(kernel_shape)):
return True
return False
def get_pad_mode(self, node_attr):
from furiosa_sdk_quantizer.frontend.onnx.quantizer.utils import attribute_to_kwargs
return attribute_to_kwargs(node_attr).get('mode', 'constant').decode("utf-8")
def update_attrs(self, attrs, pad_input):
pads = [sum(x) for x in zip(attrs['pads'], self.make_maxpool_pad(pad_input))]
attrs['pads'] = pads
return attrs
def get_attrs(self, node):
rank = len(self.get_value_info_shape(node.input[0]))
nspatial_dim = (rank - 2)
from furiosa_sdk_quantizer.frontend.onnx.quantizer.utils import attribute_to_kwargs
attrs = attribute_to_kwargs(node.attribute)
ceil_mode = attrs.get('ceil_mode', 0)
dilations = attrs.get('dilations', [1, ] * nspatial_dim)
kernel_shape = attrs['kernel_shape']
strides = attrs.get('strides', [1, ] * nspatial_dim)
pads = attrs.get('pads', [0, ] * nspatial_dim * 2)
return {'ceil_mode': ceil_mode, 'dilations': dilations,
'kernel_shape': kernel_shape,
'pads': pads, 'strides': strides}
def make_maxpool_pad(self, pad_input):
pads = self.get_initializer_array(pad_input)
rank = len(pads) // 2
new_pads = []
for pad in pads:
if pad == -1:
new_pads.append(0)
else:
new_pads.append(pad)
pads = new_pads
return [*pads[2:rank], *pads[rank + 2:2 * rank]]
def make_new_node(self, matched_nodes):
top_node, base_node = matched_nodes
attrs = self.update_attrs(self.get_attrs(base_node), top_node.input[1])
return self.make_node('MaxPool', [top_node.input[0]], [base_node.output[0]],
name=top_node.name, **attrs)
class Pattern_2(Pattern_1, abc.ABC):
"""
transform
prev --> Pad --> AveragePool --> next
to
prev --> AveragePool --> next
if 1. Pad.mode == 'constant'
2. Pad.constant_value == 0.0
3. padded on spatial dimension
4. AveragePool.count_include_pad == 1 or all AveragePool.pads == 0
5. AveragePool.ceil_mode == 0
6. fused_pads[i] < kernel_shape[i] and fused_pads[i + kernel_rank] < kernel_shape[i] for all i
"""
pattern_to_match = ['Pad', 'AveragePool']
def pattern_condition_checker(self, matched_nodes):
top_node, base_node = matched_nodes
if not self.check_condition_1(top_node.attribute):
return False
if not self.check_condition_2(top_node):
return False
if not self.check_condition_3(top_node.input[1]):
return False
if not self.check_condition_4(base_node):
return False
if not self.check_condition_5(base_node):
return False
if not self.check_condition_6(self.get_attrs(base_node), top_node.input[1]):
return False
return True
def get_attrs(self, node):
rank = len(self.get_value_info_shape(node.input[0]))
nspatial_dim = (rank - 2)
from furiosa_sdk_quantizer.frontend.onnx.quantizer.utils import attribute_to_kwargs
attrs = attribute_to_kwargs(node.attribute)
ceil_mode = attrs.get('ceil_mode', 0)
count_include_pad = attrs.get('count_include_pad', 0)
kernel_shape = attrs['kernel_shape']
strides = attrs.get('strides', [1, ] * nspatial_dim)
pads = attrs.get('pads', [0, ] * nspatial_dim * 2)
return {'ceil_mode': ceil_mode, 'count_include_pad': count_include_pad,
'kernel_shape': kernel_shape,
'pads': pads, 'strides': strides}
def update_attrs(self, attrs, pad_input):
pads = [sum(x) for x in zip(attrs['pads'], self.make_maxpool_pad(pad_input))]
attrs['pads'] = pads
attrs['count_include_pad'] = 1
return attrs
def check_condition_2(self, node):
try:
const_input = node.input[2]
constant_value = self.get_initializer_array(const_input)
except IndexError:
constant_value = 0.0
if constant_value == 0.0:
return True
return False
def check_condition_4(self, node):
attrs = self.get_attrs(node)
count_include_pad = attrs['count_include_pad']
pads = attrs['pads']
if count_include_pad == 1 or all(pad == 0 for pad in pads):
return True
return False
def check_condition_5(self, node):
attrs = self.get_attrs(node)
ceil_mode = attrs['ceil_mode']
if ceil_mode == 0:
return True
return False
def make_new_node(self, matched_nodes):
top_node, base_node = matched_nodes
attrs = self.update_attrs(self.get_attrs(base_node), top_node.input[1])
return self.make_node('AveragePool', [top_node.input[0]], [base_node.output[0]],
name=top_node.name, **attrs)
Classes
class FusePad (*args, **kwds)
-
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Expand source code
class FusePad(Transformer): def transform(self, model: onnx.ModelProto) -> onnx.ModelProto: for transformer in [ Pattern_1, Pattern_2 ]: model = transformer(model).transform() return model
Ancestors
- furiosa_sdk_quantizer.interfaces.transformer.Transformer
- typing.Generic
Methods
def transform(self, model: onnx.onnx_ml_pb2.ModelProto) ‑> onnx.onnx_ml_pb2.ModelProto
-
Expand source code
def transform(self, model: onnx.ModelProto) -> onnx.ModelProto: for transformer in [ Pattern_1, Pattern_2 ]: model = transformer(model).transform() return model
class Pattern_1 (model)
-
transform prev –> Pad –> MaxPool –> next to prev –> MaxPool –> next
if 1. Pad.mode == 'constant' 2. Pad.constant_value == -inf 3. padded on spatial dimension 4. fused_pads[i] < kernel_shape[i] and fused_pads[i + kernel_rank] < kernel_shape[i] for all i
Expand source code
class Pattern_1(ONNXTransformer, abc.ABC): """ transform prev --> Pad --> MaxPool --> next to prev --> MaxPool --> next if 1. Pad.mode == 'constant' 2. Pad.constant_value == -inf 3. padded on spatial dimension 4. fused_pads[i] < kernel_shape[i] and fused_pads[i + kernel_rank] < kernel_shape[i] for all i """ pattern_to_match = ['Pad', 'MaxPool'] def pattern_matching(self, base_node): inputs = base_node.input matched_nodes = self.pattern_matcher(base_node, self.pattern_to_match) if not matched_nodes: return inputs if not self.pattern_condition_checker(matched_nodes): return inputs self.transform_to_fuse(matched_nodes, nodes_to_add=[ self.make_new_node(matched_nodes) ]) return matched_nodes[0].input def pattern_condition_checker(self, nodes_to_check): top_node, base_node = nodes_to_check if not self.check_condition_1(top_node.attribute): return False if not self.check_condition_2(top_node): return False if not self.check_condition_3(top_node.input[1]): return False if not self.check_condition_6(self.get_attrs(base_node), top_node.input[1]): return False return True def check_condition_1(self, node_attr): if self.get_pad_mode(node_attr) == 'constant': return True return False def check_condition_2(self, node): try: const_input = node.input[2] constant_value = self.get_initializer_array(const_input) except IndexError: constant_value = 0.0 if constant_value == float('-inf'): return True return False def check_condition_3(self, pads_input): pads = self.get_initializer_array(pads_input) rank = len(pads) // 2 pads_on_nc_dim = [*pads[:2], *pads[rank:rank + 2]] if all(pad == 0 for pad in pads_on_nc_dim): return True return False def check_condition_6(self, node_attrs, pad_input): attrs = self.update_attrs(node_attrs, pad_input) kernel_shape = attrs['kernel_shape'] kernel_rank = len(kernel_shape) fused_pads = attrs['pads'] fused_pad_shape = [sum([fused_pads[dim], fused_pads[dim + kernel_rank]]) for dim in range(kernel_rank)] assert len(kernel_shape) == len(fused_pad_shape) if all(fused_pads[dim] < k and fused_pads[dim + kernel_rank] < k for dim, k in enumerate(kernel_shape)): return True return False def get_pad_mode(self, node_attr): from furiosa_sdk_quantizer.frontend.onnx.quantizer.utils import attribute_to_kwargs return attribute_to_kwargs(node_attr).get('mode', 'constant').decode("utf-8") def update_attrs(self, attrs, pad_input): pads = [sum(x) for x in zip(attrs['pads'], self.make_maxpool_pad(pad_input))] attrs['pads'] = pads return attrs def get_attrs(self, node): rank = len(self.get_value_info_shape(node.input[0])) nspatial_dim = (rank - 2) from furiosa_sdk_quantizer.frontend.onnx.quantizer.utils import attribute_to_kwargs attrs = attribute_to_kwargs(node.attribute) ceil_mode = attrs.get('ceil_mode', 0) dilations = attrs.get('dilations', [1, ] * nspatial_dim) kernel_shape = attrs['kernel_shape'] strides = attrs.get('strides', [1, ] * nspatial_dim) pads = attrs.get('pads', [0, ] * nspatial_dim * 2) return {'ceil_mode': ceil_mode, 'dilations': dilations, 'kernel_shape': kernel_shape, 'pads': pads, 'strides': strides} def make_maxpool_pad(self, pad_input): pads = self.get_initializer_array(pad_input) rank = len(pads) // 2 new_pads = [] for pad in pads: if pad == -1: new_pads.append(0) else: new_pads.append(pad) pads = new_pads return [*pads[2:rank], *pads[rank + 2:2 * rank]] def make_new_node(self, matched_nodes): top_node, base_node = matched_nodes attrs = self.update_attrs(self.get_attrs(base_node), top_node.input[1]) return self.make_node('MaxPool', [top_node.input[0]], [base_node.output[0]], name=top_node.name, **attrs)
Ancestors
- furiosa_sdk_quantizer.frontend.onnx.transformer.ONNXTransformer
- abc.ABC
Subclasses
Class variables
var pattern_to_match
Methods
def check_condition_1(self, node_attr)
-
Expand source code
def check_condition_1(self, node_attr): if self.get_pad_mode(node_attr) == 'constant': return True return False
def check_condition_2(self, node)
-
Expand source code
def check_condition_2(self, node): try: const_input = node.input[2] constant_value = self.get_initializer_array(const_input) except IndexError: constant_value = 0.0 if constant_value == float('-inf'): return True return False
def check_condition_3(self, pads_input)
-
Expand source code
def check_condition_3(self, pads_input): pads = self.get_initializer_array(pads_input) rank = len(pads) // 2 pads_on_nc_dim = [*pads[:2], *pads[rank:rank + 2]] if all(pad == 0 for pad in pads_on_nc_dim): return True return False
def check_condition_6(self, node_attrs, pad_input)
-
Expand source code
def check_condition_6(self, node_attrs, pad_input): attrs = self.update_attrs(node_attrs, pad_input) kernel_shape = attrs['kernel_shape'] kernel_rank = len(kernel_shape) fused_pads = attrs['pads'] fused_pad_shape = [sum([fused_pads[dim], fused_pads[dim + kernel_rank]]) for dim in range(kernel_rank)] assert len(kernel_shape) == len(fused_pad_shape) if all(fused_pads[dim] < k and fused_pads[dim + kernel_rank] < k for dim, k in enumerate(kernel_shape)): return True return False
def get_attrs(self, node)
-
Expand source code
def get_attrs(self, node): rank = len(self.get_value_info_shape(node.input[0])) nspatial_dim = (rank - 2) from furiosa_sdk_quantizer.frontend.onnx.quantizer.utils import attribute_to_kwargs attrs = attribute_to_kwargs(node.attribute) ceil_mode = attrs.get('ceil_mode', 0) dilations = attrs.get('dilations', [1, ] * nspatial_dim) kernel_shape = attrs['kernel_shape'] strides = attrs.get('strides', [1, ] * nspatial_dim) pads = attrs.get('pads', [0, ] * nspatial_dim * 2) return {'ceil_mode': ceil_mode, 'dilations': dilations, 'kernel_shape': kernel_shape, 'pads': pads, 'strides': strides}
def get_pad_mode(self, node_attr)
-
Expand source code
def get_pad_mode(self, node_attr): from furiosa_sdk_quantizer.frontend.onnx.quantizer.utils import attribute_to_kwargs return attribute_to_kwargs(node_attr).get('mode', 'constant').decode("utf-8")
def make_maxpool_pad(self, pad_input)
-
Expand source code
def make_maxpool_pad(self, pad_input): pads = self.get_initializer_array(pad_input) rank = len(pads) // 2 new_pads = [] for pad in pads: if pad == -1: new_pads.append(0) else: new_pads.append(pad) pads = new_pads return [*pads[2:rank], *pads[rank + 2:2 * rank]]
def make_new_node(self, matched_nodes)
-
Expand source code
def make_new_node(self, matched_nodes): top_node, base_node = matched_nodes attrs = self.update_attrs(self.get_attrs(base_node), top_node.input[1]) return self.make_node('MaxPool', [top_node.input[0]], [base_node.output[0]], name=top_node.name, **attrs)
def pattern_condition_checker(self, nodes_to_check)
-
Expand source code
def pattern_condition_checker(self, nodes_to_check): top_node, base_node = nodes_to_check if not self.check_condition_1(top_node.attribute): return False if not self.check_condition_2(top_node): return False if not self.check_condition_3(top_node.input[1]): return False if not self.check_condition_6(self.get_attrs(base_node), top_node.input[1]): return False return True
def pattern_matching(self, base_node)
-
Expand source code
def pattern_matching(self, base_node): inputs = base_node.input matched_nodes = self.pattern_matcher(base_node, self.pattern_to_match) if not matched_nodes: return inputs if not self.pattern_condition_checker(matched_nodes): return inputs self.transform_to_fuse(matched_nodes, nodes_to_add=[ self.make_new_node(matched_nodes) ]) return matched_nodes[0].input
def update_attrs(self, attrs, pad_input)
-
Expand source code
def update_attrs(self, attrs, pad_input): pads = [sum(x) for x in zip(attrs['pads'], self.make_maxpool_pad(pad_input))] attrs['pads'] = pads return attrs
class Pattern_2 (model)
-
transform prev –> Pad –> AveragePool –> next to prev –> AveragePool –> next
if 1. Pad.mode == 'constant' 2. Pad.constant_value == 0.0 3. padded on spatial dimension 4. AveragePool.count_include_pad == 1 or all AveragePool.pads == 0 5. AveragePool.ceil_mode == 0 6. fused_pads[i] < kernel_shape[i] and fused_pads[i + kernel_rank] < kernel_shape[i] for all i
Expand source code
class Pattern_2(Pattern_1, abc.ABC): """ transform prev --> Pad --> AveragePool --> next to prev --> AveragePool --> next if 1. Pad.mode == 'constant' 2. Pad.constant_value == 0.0 3. padded on spatial dimension 4. AveragePool.count_include_pad == 1 or all AveragePool.pads == 0 5. AveragePool.ceil_mode == 0 6. fused_pads[i] < kernel_shape[i] and fused_pads[i + kernel_rank] < kernel_shape[i] for all i """ pattern_to_match = ['Pad', 'AveragePool'] def pattern_condition_checker(self, matched_nodes): top_node, base_node = matched_nodes if not self.check_condition_1(top_node.attribute): return False if not self.check_condition_2(top_node): return False if not self.check_condition_3(top_node.input[1]): return False if not self.check_condition_4(base_node): return False if not self.check_condition_5(base_node): return False if not self.check_condition_6(self.get_attrs(base_node), top_node.input[1]): return False return True def get_attrs(self, node): rank = len(self.get_value_info_shape(node.input[0])) nspatial_dim = (rank - 2) from furiosa_sdk_quantizer.frontend.onnx.quantizer.utils import attribute_to_kwargs attrs = attribute_to_kwargs(node.attribute) ceil_mode = attrs.get('ceil_mode', 0) count_include_pad = attrs.get('count_include_pad', 0) kernel_shape = attrs['kernel_shape'] strides = attrs.get('strides', [1, ] * nspatial_dim) pads = attrs.get('pads', [0, ] * nspatial_dim * 2) return {'ceil_mode': ceil_mode, 'count_include_pad': count_include_pad, 'kernel_shape': kernel_shape, 'pads': pads, 'strides': strides} def update_attrs(self, attrs, pad_input): pads = [sum(x) for x in zip(attrs['pads'], self.make_maxpool_pad(pad_input))] attrs['pads'] = pads attrs['count_include_pad'] = 1 return attrs def check_condition_2(self, node): try: const_input = node.input[2] constant_value = self.get_initializer_array(const_input) except IndexError: constant_value = 0.0 if constant_value == 0.0: return True return False def check_condition_4(self, node): attrs = self.get_attrs(node) count_include_pad = attrs['count_include_pad'] pads = attrs['pads'] if count_include_pad == 1 or all(pad == 0 for pad in pads): return True return False def check_condition_5(self, node): attrs = self.get_attrs(node) ceil_mode = attrs['ceil_mode'] if ceil_mode == 0: return True return False def make_new_node(self, matched_nodes): top_node, base_node = matched_nodes attrs = self.update_attrs(self.get_attrs(base_node), top_node.input[1]) return self.make_node('AveragePool', [top_node.input[0]], [base_node.output[0]], name=top_node.name, **attrs)
Ancestors
- Pattern_1
- furiosa_sdk_quantizer.frontend.onnx.transformer.ONNXTransformer
- abc.ABC
Class variables
var pattern_to_match
Methods
def check_condition_2(self, node)
-
Expand source code
def check_condition_2(self, node): try: const_input = node.input[2] constant_value = self.get_initializer_array(const_input) except IndexError: constant_value = 0.0 if constant_value == 0.0: return True return False
def check_condition_4(self, node)
-
Expand source code
def check_condition_4(self, node): attrs = self.get_attrs(node) count_include_pad = attrs['count_include_pad'] pads = attrs['pads'] if count_include_pad == 1 or all(pad == 0 for pad in pads): return True return False
def check_condition_5(self, node)
-
Expand source code
def check_condition_5(self, node): attrs = self.get_attrs(node) ceil_mode = attrs['ceil_mode'] if ceil_mode == 0: return True return False
def get_attrs(self, node)
-
Expand source code
def get_attrs(self, node): rank = len(self.get_value_info_shape(node.input[0])) nspatial_dim = (rank - 2) from furiosa_sdk_quantizer.frontend.onnx.quantizer.utils import attribute_to_kwargs attrs = attribute_to_kwargs(node.attribute) ceil_mode = attrs.get('ceil_mode', 0) count_include_pad = attrs.get('count_include_pad', 0) kernel_shape = attrs['kernel_shape'] strides = attrs.get('strides', [1, ] * nspatial_dim) pads = attrs.get('pads', [0, ] * nspatial_dim * 2) return {'ceil_mode': ceil_mode, 'count_include_pad': count_include_pad, 'kernel_shape': kernel_shape, 'pads': pads, 'strides': strides}
def make_new_node(self, matched_nodes)
-
Expand source code
def make_new_node(self, matched_nodes): top_node, base_node = matched_nodes attrs = self.update_attrs(self.get_attrs(base_node), top_node.input[1]) return self.make_node('AveragePool', [top_node.input[0]], [base_node.output[0]], name=top_node.name, **attrs)
def pattern_condition_checker(self, matched_nodes)
-
Expand source code
def pattern_condition_checker(self, matched_nodes): top_node, base_node = matched_nodes if not self.check_condition_1(top_node.attribute): return False if not self.check_condition_2(top_node): return False if not self.check_condition_3(top_node.input[1]): return False if not self.check_condition_4(base_node): return False if not self.check_condition_5(base_node): return False if not self.check_condition_6(self.get_attrs(base_node), top_node.input[1]): return False return True
def update_attrs(self, attrs, pad_input)
-
Expand source code
def update_attrs(self, attrs, pad_input): pads = [sum(x) for x in zip(attrs['pads'], self.make_maxpool_pad(pad_input))] attrs['pads'] = pads attrs['count_include_pad'] = 1 return attrs