部署基于 TensorRT 量化的 YOLOV5s 4.0 模型
最编程
2024-04-02 22:21:09
...
//量化预处理与训练保持一致,数据对齐
def preprocess_v1(image_raw):
h, w, c = image_raw.shape
image = cv2.cvtColor(image_raw, cv2.COLOR_BGR2RGB)
# Calculate widht and height and paddings
r_w = width / w
r_h = height / h
if r_h > r_w:
tw = width
th = int(r_w * h)
tx1 = tx2 = 0
ty1 = int((height - th) / 2)
ty2 = height - th - ty1
else:
tw = int(r_h * w)
th = height
tx1 = int((width - tw) / 2)
tx2 = width - tw - tx1
ty1 = ty2 = 0
# Resize the image with long side while maintaining ratio
image = cv2.resize(image, (tw, th))
# Pad the short side with (128,128,128)
image = cv2.copyMakeBorder(
image, ty1, ty2, tx1, tx2, cv2.BORDER_CONSTANT, (128, 128, 128)
)
image = image.astype(np.float32)
# Normalize to [0,1]
image /= 255.0
# HWC to CHW format:
image = np.transpose(image, [2, 0, 1])
# CHW to NCHW format
#image = np.expand_dims(image, axis=0)
# Convert the image to row-major order, also known as "C order":
#image = np.ascontiguousarray(image)
return image
//构建IInt8EntropyCalibrator量化器
class Calibrator(trt.IInt8EntropyCalibrator):
def __init__(self, stream, cache_file=""):
trt.IInt8EntropyCalibrator.__init__(self)
self.stream = stream
self.d_input = cuda.mem_alloc(self.stream.calibration_data.nbytes)
self.cache_file = cache_file
stream.reset()
def get_batch_size(self):
return self.stream.batch_size
def get_batch(self, names):
batch = self.stream.next_batch()
if not batch.size:
return None
cuda.memcpy_htod(self.d_input, batch)
return [int(self.d_input)]
def read_calibration_cache(self):
# If there is a cache, use it instead of calibrating again. Otherwise, implicitly return None.
if os.path.exists(self.cache_file):
with open(self.cache_file, "rb") as f:
logger.info("Using calibration cache to save time: {:}".format(self.cache_file))
return f.read()
def write_calibration_cache(self, cache):
with open(self.cache_file, "wb") as f:
logger.info("Caching calibration data for future use: {:}".format(self.cache_file))
f.write(cache)
//加载onnx模型,构建tensorrt engine
def get_engine(max_batch_size=1, onnx_file_path="", engine_file_path="",\
fp16_mode=False, int8_mode=False, calibration_stream=None, calibration_table_path="", save_engine=False):
"""Attempts to load a serialized engine if available, otherwise builds a new TensorRT engine and saves it."""
def build_engine(max_batch_size, save_engine):
"""Takes an ONNX file and creates a TensorRT engine to run inference with"""
with trt.Builder(TRT_LOGGER) as builder, \
builder.create_network(1) as network,\
trt.OnnxParser(network, TRT_LOGGER) as parser:
# parse onnx model file
if not os.path.exists(onnx_file_path):
quit('ONNX file {} not found'.format(onnx_file_path))
print('Loading ONNX file from path {}...'.format(onnx_file_path))
with open(onnx_file_path, 'rb') as model:
print('Beginning ONNX file parsing')
parser.parse(model.read())
assert network.num_layers > 0, 'Failed to parse ONNX model. \
Please check if the ONNX model is compatible '
print('Completed parsing of ONNX file')
print('Building an engine from file {}; this may take a while...'.format(onnx_file_path))
# build trt engine
builder.max_batch_size = max_batch_size
builder.max_workspace_size = 1 << 30 # 1GB
builder.fp16_mode = fp16_mode
if int8_mode:
builder.int8_mode = int8_mode
assert calibration_stream, 'Error: a calibration_stream should be provided for int8 mode'
builder.int8_calibrator = Calibrator(calibration_stream, calibration_table_path)
print('Int8 mode enabled')
engine = builder.build_cuda_engine(network)
if engine is None:
print('Failed to create the engine')
return None
print("Completed creating the engine")
if save_engine:
with open(engine_file_path, "wb") as f:
f.write(engine.serialize())
return engine
if os.path.exists(engine_file_path):
# If a serialized engine exists, load it instead of building a new one.
print("Reading engine from file {}".format(engine_file_path))
with open(engine_file_path, "rb") as f, trt.Runtime(TRT_LOGGER) as runtime:
return runtime.deserialize_cuda_engine(f.read())
else:
return build_engine(max_batch_size, save_engine)
def preprocess_v1(image_raw):
h, w, c = image_raw.shape
image = cv2.cvtColor(image_raw, cv2.COLOR_BGR2RGB)
# Calculate widht and height and paddings
r_w = width / w
r_h = height / h
if r_h > r_w:
tw = width
th = int(r_w * h)
tx1 = tx2 = 0
ty1 = int((height - th) / 2)
ty2 = height - th - ty1
else:
tw = int(r_h * w)
th = height
tx1 = int((width - tw) / 2)
tx2 = width - tw - tx1
ty1 = ty2 = 0
# Resize the image with long side while maintaining ratio
image = cv2.resize(image, (tw, th))
# Pad the short side with (128,128,128)
image = cv2.copyMakeBorder(
image, ty1, ty2, tx1, tx2, cv2.BORDER_CONSTANT, (128, 128, 128)
)
image = image.astype(np.float32)
# Normalize to [0,1]
image /= 255.0
# HWC to CHW format:
image = np.transpose(image, [2, 0, 1])
# CHW to NCHW format
#image = np.expand_dims(image, axis=0)
# Convert the image to row-major order, also known as "C order":
#image = np.ascontiguousarray(image)
return image
//构建IInt8EntropyCalibrator量化器
class Calibrator(trt.IInt8EntropyCalibrator):
def __init__(self, stream, cache_file=""):
trt.IInt8EntropyCalibrator.__init__(self)
self.stream = stream
self.d_input = cuda.mem_alloc(self.stream.calibration_data.nbytes)
self.cache_file = cache_file
stream.reset()
def get_batch_size(self):
return self.stream.batch_size
def get_batch(self, names):
batch = self.stream.next_batch()
if not batch.size:
return None
cuda.memcpy_htod(self.d_input, batch)
return [int(self.d_input)]
def read_calibration_cache(self):
# If there is a cache, use it instead of calibrating again. Otherwise, implicitly return None.
if os.path.exists(self.cache_file):
with open(self.cache_file, "rb") as f:
logger.info("Using calibration cache to save time: {:}".format(self.cache_file))
return f.read()
def write_calibration_cache(self, cache):
with open(self.cache_file, "wb") as f:
logger.info("Caching calibration data for future use: {:}".format(self.cache_file))
f.write(cache)
//加载onnx模型,构建tensorrt engine
def get_engine(max_batch_size=1, onnx_file_path="", engine_file_path="",\
fp16_mode=False, int8_mode=False, calibration_stream=None, calibration_table_path="", save_engine=False):
"""Attempts to load a serialized engine if available, otherwise builds a new TensorRT engine and saves it."""
def build_engine(max_batch_size, save_engine):
"""Takes an ONNX file and creates a TensorRT engine to run inference with"""
with trt.Builder(TRT_LOGGER) as builder, \
builder.create_network(1) as network,\
trt.OnnxParser(network, TRT_LOGGER) as parser:
# parse onnx model file
if not os.path.exists(onnx_file_path):
quit('ONNX file {} not found'.format(onnx_file_path))
print('Loading ONNX file from path {}...'.format(onnx_file_path))
with open(onnx_file_path, 'rb') as model:
print('Beginning ONNX file parsing')
parser.parse(model.read())
assert network.num_layers > 0, 'Failed to parse ONNX model. \
Please check if the ONNX model is compatible '
print('Completed parsing of ONNX file')
print('Building an engine from file {}; this may take a while...'.format(onnx_file_path))
# build trt engine
builder.max_batch_size = max_batch_size
builder.max_workspace_size = 1 << 30 # 1GB
builder.fp16_mode = fp16_mode
if int8_mode:
builder.int8_mode = int8_mode
assert calibration_stream, 'Error: a calibration_stream should be provided for int8 mode'
builder.int8_calibrator = Calibrator(calibration_stream, calibration_table_path)
print('Int8 mode enabled')
engine = builder.build_cuda_engine(network)
if engine is None:
print('Failed to create the engine')
return None
print("Completed creating the engine")
if save_engine:
with open(engine_file_path, "wb") as f:
f.write(engine.serialize())
return engine
if os.path.exists(engine_file_path):
# If a serialized engine exists, load it instead of building a new one.
print("Reading engine from file {}".format(engine_file_path))
with open(engine_file_path, "rb") as f, trt.Runtime(TRT_LOGGER) as runtime:
return runtime.deserialize_cuda_engine(f.read())
else:
return build_engine(max_batch_size, save_engine)