Module furiosa.quantizer.furiosa_sdk_quantizer.frontend.onnx.quantizer.utils
Expand source code
from typing import List, Tuple, Dict
import warnings
import onnx
import numpy as np
from onnx import TensorProto, TensorAnnotation, StringStringEntryProto
import onnxruntime as ort
from onnxruntime_tools.quantization.quantize import _attribute_to_kwarg
__PRODUCER__ = "jason_furiosa"
# This OP list is based on ONNX operator spec.
# Integer_math and Integer_non_math ops can handle int8 inputs whereas Sandwich ops can not.
# Note: Concat is exceptional. As it requires re-quantizing multiple inputs.
__DYNAMIC_RANGE_COLLECTORS__ = {
'integer_math': [
'Conv',
'MatMul',
],
'integer_non_math': [
'MaxPool',
'Squeeze',
'Unsqueeze',
'Gather',
'Transpose',
'Reshape',
'DepthToSpace',
'Expand',
'Clip',
'Split',
'Pad',
'Resize',
'Flatten',
'Slice',
],
'sandwich': [
'Gemm',
'Add',
'ReduceMean',
'Softmax',
'Relu',
'Concat',
'Softmax',
'Softplus',
'ReduceL2',
'LayerNormalization',
'Gelu',
'GlobalAveragePool',
'Sigmoid',
'Mul',
'AveragePool',
'ReduceSum',
'Div',
'ConvTranspose',
'LpNormalization',
],
}
class QuantizationMode:
# dfg: Quantize graph to DFG(Quantized graph) export
dfg = 0
# fake: Evaluate quantized graph replacing QConvLinear with Conv2d/MatMul &
fake = 1
def get_qrange(qtype):
"""
source: onnxruntime quantization tools
"""
if qtype == TensorProto.UINT8:
return 255 # 2^b - 1
elif qtype == TensorProto.INT8:
return 254 # [-(2^{b-1}-1), 2^{b-1}-1]: [-127, 127] for 8 bits.
else:
raise ValueError('unsupported quantization data type')
def get_vi_dtype(vi):
"""
This function returns value_info's data type
:param vi: graph.value_info
:return: graph.value_info.type.tensor_type.elem_type
"""
return vi.type.tensor_type.elem_type
def is_float_tensor(vi):
if get_vi_dtype(vi) == onnx.TensorProto.FLOAT:
return True
return False
def activation_scale_zeropoint(rmin, rmax, input_qtype):
rmin = min(rmin, 0)
rmax = max(rmax, 0)
return asymmetric_scale_zeropoint(rmin, rmax, input_qtype)
def asymmetric_scale_zeropoint(rmin, rmax, input_qtype):
"""
source: onnxruntime quantization tools
"""
scale = np.float32((rmax - rmin) / 255 if rmin != rmax else 1)
# The minimum positive (subnormal) value is 2 ** -149 for IEEE 754 single-precision binary floating-point format
# source: https://en.wikipedia.org/wiki/Single-precision_floating-point_format#Exponent_encoding
scale = max(scale, 2 ** -149)
if input_qtype == TensorProto.UINT8:
initial_zero_point = (0 - rmin) / scale
zero_point = np.uint8(round(max(0, min(255, initial_zero_point))))
return np.array(zero_point), np.array(scale)
elif input_qtype == TensorProto.INT8:
initial_zero_point = -128 - rmin / scale
zero_point = np.int8(round(max(-128, min(127, initial_zero_point))))
return np.array(zero_point), np.array(scale)
else:
Exception('qType must be one of UINT8 or INT8')
def calculate_activation_quant_params(dynamic_ranges: Dict,
node_list: List[onnx.NodeProto],
input_qtype: TensorProto,
value_info: Dict) -> Dict:
quantization_params = {}
for node in node_list:
# Quantize activation input/output, following TFLite's quantization specification
if node.op_type in ['MaxPool', 'Squeeze', 'Unsqueeze', 'Gather', 'Transpose', 'Reshape',
'DepthToSpace', 'Expand', 'Flatten', 'GlobalAveragePool', 'AveragePool']:
if not is_float_tensor(value_info[node.input[0]]):
continue
if node.input[0] not in quantization_params.keys():
quantization_params[node.input[0]] = activation_scale_zeropoint(
*dynamic_ranges[node.input[0]],
input_qtype)
quantization_params[node.output[0]] = quantization_params[node.input[0]]
elif node.op_type in ['Softmax', 'Sigmoid']:
if node.input[0] not in quantization_params.keys():
quantization_params[node.input[0]] = activation_scale_zeropoint(
*dynamic_ranges[node.input[0]],
input_qtype)
if input_qtype == TensorProto.INT8:
zero_point = np.array((np.int8(-128)))
elif input_qtype == TensorProto.UINT8:
zero_point = np.array(np.uint8(0))
else:
raise Exception()
quantization_params[node.output[0]] = (zero_point, np.array(np.float32(1.0 / 256.0)))
elif node.op_type in ['LpNormalization']:
if node.input[0] not in quantization_params.keys():
quantization_params[node.input[0]] = activation_scale_zeropoint(
*dynamic_ranges[node.input[0]],
input_qtype)
if input_qtype == TensorProto.INT8:
zero_point = np.array((np.int8(0)))
elif input_qtype == TensorProto.UINT8:
zero_point = np.array(np.uint8(128))
else:
raise Exception()
quantization_params[node.output[0]] = (zero_point, np.array(np.float32(1.0 / 128.0)))
else:
for name in list(node.input) + list(node.output):
if name not in dynamic_ranges:
continue
if name in quantization_params.keys():
continue
rmin, rmax = dynamic_ranges[name]
zero_point, scale = activation_scale_zeropoint(rmin, rmax, input_qtype)
quantization_params[name] = (zero_point, scale)
return quantization_params
def calculate_weight_quant_params(data: np.array, weight_qtype: TensorProto) -> Tuple[int, float]:
"""
:parameter data: data to quantize
:parameter weight_qtype: quantization data type of weight
:return: quantized weights, zero point, scale
To pack weights, we compute a linear transformation
- when data type == uint8 mode, from [rmin, rmax] -> [0, 2^{b-1}] and
- when data type == int8, from [-m , m] -> [-(2^{b-1}-1), 2^{b-1}-1] where
m = max(abs(rmin), abs(rmax))
and add necessary intermediate nodes to trasnform quantized weight to full weight using the equation
r = S(q-z), where
r: real original value
q: quantized value
S: scale
z: zero point
source: onnxruntime quantization tools
"""
rmin = min(min(data), 0)
rmax = max(max(data), 0)
quantized_range = get_qrange(weight_qtype)
if weight_qtype == TensorProto.INT8:
max_range = max(abs(rmin), abs(rmax))
if max_range > 0:
scale = (max_range * 2.0) / quantized_range
else:
warnings.warn('both the min and the max of data are 0')
scale = 1.0
zero_point = 0
elif weight_qtype == TensorProto.UINT8:
scale = (float(rmax) - rmin) / quantized_range if rmin != rmax else 1
zero_point = round((0 - rmin) / scale) # round to nearest integer
else:
raise ValueError(
"Unexpected data type {} requested. Only INT8 and UINT8 are supported.".format(weight_qtype))
# The minimum positive (subnormal) value is 2 ** -149 for IEEE 754 single-precision binary floating-point format
# source: https://en.wikipedia.org/wiki/Single-precision_floating-point_format#Exponent_encoding
scale = max(scale, 2 ** -149)
return zero_point, scale
def make_tensor_annotation(tensor_name, zero_point_name, scale_name):
'''
Helper function to make_tensor_annotation
'''
annot = TensorAnnotation()
annot.tensor_name = tensor_name
quant_param_scale = StringStringEntryProto()
quant_param_scale.key = 'SCALE_TENSOR'
quant_param_scale.value = scale_name
quant_param_zero_point = StringStringEntryProto()
quant_param_zero_point.key = 'ZERO_POINT_TENSOR'
quant_param_zero_point.value = zero_point_name
annot.quant_parameter_tensor_names.extend([quant_param_scale, quant_param_zero_point])
return annot
def attribute_to_kwargs(attributes: onnx.AttributeProto) -> Dict:
kwargs = {}
for attr in attributes:
kwargs.update(_attribute_to_kwarg(attr))
return kwargs
def append_suffix(name: str, suffix: List[str]) -> List[str]:
"""
Helper function to append suffixes to the given name.
"""
return list(map(lambda x: name + x, suffix))
def get_input_tensors(model: onnx.ModelProto) -> List[str]:
ort.set_default_logger_severity(3)
sess = ort.InferenceSession(model.SerializeToString())
input_tensors = [inp.name for inp in sess.get_inputs()]
return input_tensors
Functions
def activation_scale_zeropoint(rmin, rmax, input_qtype)
-
Expand source code
def activation_scale_zeropoint(rmin, rmax, input_qtype): rmin = min(rmin, 0) rmax = max(rmax, 0) return asymmetric_scale_zeropoint(rmin, rmax, input_qtype)
def append_suffix(name: str, suffix: List[str]) ‑> List[str]
-
Helper function to append suffixes to the given name.
Expand source code
def append_suffix(name: str, suffix: List[str]) -> List[str]: """ Helper function to append suffixes to the given name. """ return list(map(lambda x: name + x, suffix))
def asymmetric_scale_zeropoint(rmin, rmax, input_qtype)
-
source: onnxruntime quantization tools
Expand source code
def asymmetric_scale_zeropoint(rmin, rmax, input_qtype): """ source: onnxruntime quantization tools """ scale = np.float32((rmax - rmin) / 255 if rmin != rmax else 1) # The minimum positive (subnormal) value is 2 ** -149 for IEEE 754 single-precision binary floating-point format # source: https://en.wikipedia.org/wiki/Single-precision_floating-point_format#Exponent_encoding scale = max(scale, 2 ** -149) if input_qtype == TensorProto.UINT8: initial_zero_point = (0 - rmin) / scale zero_point = np.uint8(round(max(0, min(255, initial_zero_point)))) return np.array(zero_point), np.array(scale) elif input_qtype == TensorProto.INT8: initial_zero_point = -128 - rmin / scale zero_point = np.int8(round(max(-128, min(127, initial_zero_point)))) return np.array(zero_point), np.array(scale) else: Exception('qType must be one of UINT8 or INT8')
def attribute_to_kwargs(attributes: onnx.onnx_ml_pb2.AttributeProto) ‑> Dict
-
Expand source code
def attribute_to_kwargs(attributes: onnx.AttributeProto) -> Dict: kwargs = {} for attr in attributes: kwargs.update(_attribute_to_kwarg(attr)) return kwargs
def calculate_activation_quant_params(dynamic_ranges: Dict, node_list: List[onnx.onnx_ml_pb2.NodeProto], input_qtype: onnx.onnx_ml_pb2.TensorProto, value_info: Dict) ‑> Dict
-
Expand source code
def calculate_activation_quant_params(dynamic_ranges: Dict, node_list: List[onnx.NodeProto], input_qtype: TensorProto, value_info: Dict) -> Dict: quantization_params = {} for node in node_list: # Quantize activation input/output, following TFLite's quantization specification if node.op_type in ['MaxPool', 'Squeeze', 'Unsqueeze', 'Gather', 'Transpose', 'Reshape', 'DepthToSpace', 'Expand', 'Flatten', 'GlobalAveragePool', 'AveragePool']: if not is_float_tensor(value_info[node.input[0]]): continue if node.input[0] not in quantization_params.keys(): quantization_params[node.input[0]] = activation_scale_zeropoint( *dynamic_ranges[node.input[0]], input_qtype) quantization_params[node.output[0]] = quantization_params[node.input[0]] elif node.op_type in ['Softmax', 'Sigmoid']: if node.input[0] not in quantization_params.keys(): quantization_params[node.input[0]] = activation_scale_zeropoint( *dynamic_ranges[node.input[0]], input_qtype) if input_qtype == TensorProto.INT8: zero_point = np.array((np.int8(-128))) elif input_qtype == TensorProto.UINT8: zero_point = np.array(np.uint8(0)) else: raise Exception() quantization_params[node.output[0]] = (zero_point, np.array(np.float32(1.0 / 256.0))) elif node.op_type in ['LpNormalization']: if node.input[0] not in quantization_params.keys(): quantization_params[node.input[0]] = activation_scale_zeropoint( *dynamic_ranges[node.input[0]], input_qtype) if input_qtype == TensorProto.INT8: zero_point = np.array((np.int8(0))) elif input_qtype == TensorProto.UINT8: zero_point = np.array(np.uint8(128)) else: raise Exception() quantization_params[node.output[0]] = (zero_point, np.array(np.float32(1.0 / 128.0))) else: for name in list(node.input) + list(node.output): if name not in dynamic_ranges: continue if name in quantization_params.keys(): continue rmin, rmax = dynamic_ranges[name] zero_point, scale = activation_scale_zeropoint(rmin, rmax, input_qtype) quantization_params[name] = (zero_point, scale) return quantization_params
def calculate_weight_quant_params(data:
, weight_qtype: onnx.onnx_ml_pb2.TensorProto) ‑> Tuple[int, float] -
:parameter data: data to quantize :parameter weight_qtype: quantization data type of weight :return: quantized weights, zero point, scale
To pack weights, we compute a linear transformation - when data type == uint8 mode, from [rmin, rmax] -> [0, 2^{b-1}] and - when data type == int8, from [-m , m] -> [-(2^{b-1}-1), 2^{b-1}-1] where m = max(abs(rmin), abs(rmax))
and add necessary intermediate nodes to trasnform quantized weight to full weight using the equation r = S(q-z), where r: real original value q: quantized value S: scale z: zero point
source: onnxruntime quantization tools
Expand source code
def calculate_weight_quant_params(data: np.array, weight_qtype: TensorProto) -> Tuple[int, float]: """ :parameter data: data to quantize :parameter weight_qtype: quantization data type of weight :return: quantized weights, zero point, scale To pack weights, we compute a linear transformation - when data type == uint8 mode, from [rmin, rmax] -> [0, 2^{b-1}] and - when data type == int8, from [-m , m] -> [-(2^{b-1}-1), 2^{b-1}-1] where m = max(abs(rmin), abs(rmax)) and add necessary intermediate nodes to trasnform quantized weight to full weight using the equation r = S(q-z), where r: real original value q: quantized value S: scale z: zero point source: onnxruntime quantization tools """ rmin = min(min(data), 0) rmax = max(max(data), 0) quantized_range = get_qrange(weight_qtype) if weight_qtype == TensorProto.INT8: max_range = max(abs(rmin), abs(rmax)) if max_range > 0: scale = (max_range * 2.0) / quantized_range else: warnings.warn('both the min and the max of data are 0') scale = 1.0 zero_point = 0 elif weight_qtype == TensorProto.UINT8: scale = (float(rmax) - rmin) / quantized_range if rmin != rmax else 1 zero_point = round((0 - rmin) / scale) # round to nearest integer else: raise ValueError( "Unexpected data type {} requested. Only INT8 and UINT8 are supported.".format(weight_qtype)) # The minimum positive (subnormal) value is 2 ** -149 for IEEE 754 single-precision binary floating-point format # source: https://en.wikipedia.org/wiki/Single-precision_floating-point_format#Exponent_encoding scale = max(scale, 2 ** -149) return zero_point, scale
def get_input_tensors(model: onnx.onnx_ml_pb2.ModelProto) ‑> List[str]
-
Expand source code
def get_input_tensors(model: onnx.ModelProto) -> List[str]: ort.set_default_logger_severity(3) sess = ort.InferenceSession(model.SerializeToString()) input_tensors = [inp.name for inp in sess.get_inputs()] return input_tensors
def get_qrange(qtype)
-
source: onnxruntime quantization tools
Expand source code
def get_qrange(qtype): """ source: onnxruntime quantization tools """ if qtype == TensorProto.UINT8: return 255 # 2^b - 1 elif qtype == TensorProto.INT8: return 254 # [-(2^{b-1}-1), 2^{b-1}-1]: [-127, 127] for 8 bits. else: raise ValueError('unsupported quantization data type')
def get_vi_dtype(vi)
-
This function returns value_info's data type
:param vi: graph.value_info :return: graph.value_info.type.tensor_type.elem_type
Expand source code
def get_vi_dtype(vi): """ This function returns value_info's data type :param vi: graph.value_info :return: graph.value_info.type.tensor_type.elem_type """ return vi.type.tensor_type.elem_type
def is_float_tensor(vi)
-
Expand source code
def is_float_tensor(vi): if get_vi_dtype(vi) == onnx.TensorProto.FLOAT: return True return False
def make_tensor_annotation(tensor_name, zero_point_name, scale_name)
-
Helper function to make_tensor_annotation
Expand source code
def make_tensor_annotation(tensor_name, zero_point_name, scale_name): ''' Helper function to make_tensor_annotation ''' annot = TensorAnnotation() annot.tensor_name = tensor_name quant_param_scale = StringStringEntryProto() quant_param_scale.key = 'SCALE_TENSOR' quant_param_scale.value = scale_name quant_param_zero_point = StringStringEntryProto() quant_param_zero_point.key = 'ZERO_POINT_TENSOR' quant_param_zero_point.value = zero_point_name annot.quant_parameter_tensor_names.extend([quant_param_scale, quant_param_zero_point]) return annot
Classes
class QuantizationMode
-
Expand source code
class QuantizationMode: # dfg: Quantize graph to DFG(Quantized graph) export dfg = 0 # fake: Evaluate quantized graph replacing QConvLinear with Conv2d/MatMul & fake = 1
Class variables
var dfg
var fake