新增从配置文件传入物理值

This commit is contained in:
锦鲤 2025-06-27 00:21:42 +08:00
parent f5d26bdf25
commit bdc4dc0883
4 changed files with 49 additions and 20 deletions

View File

@ -30,6 +30,9 @@ CanCodecProcessor 参数说明:
+ dbc_fileDBC文件路径 + dbc_fileDBC文件路径
+ dbc_info加载DBC文件信息 + dbc_info加载DBC文件信息
+ dbc_info_details更详细的DBC文件信息 + dbc_info_details更详细的DBC文件信息
+ save_config是否进行保存配置文件
+ load_sig_cfg是否从配置文件中加载信号值
+ config_save_path自定义配置文件保存路径默认为空

4
dbc_file/.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
*
!test.dbc
!.gitignore

View File

@ -15,7 +15,7 @@ from scripts.parser_info_output import InfoOutput
dbc_file = "./dbc_file/test.dbc" dbc_file = "./dbc_file/test.dbc"
codec_processor = CanCodecProcessor(dbc_file, dbc_info=False, dbc_info_details=False) codec_processor = CanCodecProcessor(dbc_file, dbc_info=False, dbc_info_details=False, save_config=True, load_sig_cfg=False)
# 根据 DBC 文件、物理值去组建 CAN 报文示例调用: # 根据 DBC 文件、物理值去组建 CAN 报文示例调用:
message_name = "radar" # 替换为目标消息名称 message_name = "radar" # 替换为目标消息名称

View File

@ -32,16 +32,25 @@ RESET = '\033[0m'
class CanCodecProcessor: class CanCodecProcessor:
def __init__(self, dbc_path: str, dbc_info: bool=False, dbc_info_details: bool=False): def __init__(self, dbc_path: str, dbc_info: bool=False, dbc_info_details: bool=False, save_config: bool=True, \
self.Debug = True load_sig_cfg: bool=False, config_save_path: str=""):
self.Debug = False
self.dbc_info = dbc_info self.dbc_info = dbc_info
self.save_config = save_config
self.load_sig_cfg = load_sig_cfg
self.dbc_info_details = dbc_info_details self.dbc_info_details = dbc_info_details
self.config_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + '/config/can_config.json'
self.info_oput = InfoOutput() self.info_oput = InfoOutput()
self._lock = threading.Lock() self._lock = threading.Lock()
self.dbc = self._load_dbc_file(dbc_path) self.dbc = self._load_dbc_file(dbc_path)
self._load_default_config(self.config_dir, self.dbc)
if config_save_path == "":
self.config_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + '/config/can_config.json'
else:
self.config_dir = config_save_path
if self.save_config:
self._save_default_signal_config(self.config_dir, self.dbc)
def _load_dbc_file(self, dbc_path: str) -> Optional[cantools.db.Database]: def _load_dbc_file(self, dbc_path: str) -> Optional[cantools.db.Database]:
"""加载 DBC 文件,并进行异常处理""" """加载 DBC 文件,并进行异常处理"""
@ -62,7 +71,7 @@ class CanCodecProcessor:
print(f"{RED}[ERROR] 加载 DBC 文件时发生未知错误: {e}{RESET}") print(f"{RED}[ERROR] 加载 DBC 文件时发生未知错误: {e}{RESET}")
exit(1) exit(1)
def _load_default_config(self, output_json_path: str, db: Optional[cantools.db.Database]) -> bool: def _save_default_signal_config(self, output_json_path: str, db: Optional[cantools.db.Database]) -> bool:
"""JSON配置保存默信号解析保存""" """JSON配置保存默信号解析保存"""
if Path(output_json_path).exists(): if Path(output_json_path).exists():
print("[INFO] DBC文件报文和信号初始值配置文件已存在.") print("[INFO] DBC文件报文和信号初始值配置文件已存在.")
@ -143,6 +152,13 @@ class CanCodecProcessor:
with open(json_path, 'w') as f: with open(json_path, 'w') as f:
json.dump(read_config, f, indent=2) json.dump(read_config, f, indent=2)
def _load_can_config(self, signal_name: str, json_path: str):
"""加载CAN Signal JSON配置"""
with open(json_path, 'r', encoding='utf-8') as json_file:
signal_data = json.load(json_file)
return signal_data["encode_messages"][f"{signal_name}_data"]["signals"]
def _apply_signal_attributes(self, raw_value: Union[int, float], signal: cantools.db.Signal) -> float: def _apply_signal_attributes(self, raw_value: Union[int, float], signal: cantools.db.Signal) -> float:
"""应用信号属性(缩放因子和偏移量)检查""" """应用信号属性(缩放因子和偏移量)检查"""
physical_value = raw_value * signal.scale + signal.offset physical_value = raw_value * signal.scale + signal.offset
@ -156,7 +172,7 @@ class CanCodecProcessor:
return physical_value return physical_value
def encode_message(self, message_name: str, signal_values:json): def encode_message(self, message_name: str, signal_values:json = None):
"""编码信号为 CAN 报文数据""" """编码信号为 CAN 报文数据"""
# 获取消息对象 # 获取消息对象
message = self.dbc.get_message_by_name(message_name) message = self.dbc.get_message_by_name(message_name)
@ -164,19 +180,23 @@ class CanCodecProcessor:
# 构造信号值 # 构造信号值
signal_data = {} signal_data = {}
for signal in message.signals: if self.save_config and self.load_sig_cfg:
signal_name = signal.name signal_data = self._load_can_config(message_name, self.config_dir)
else:
for signal in message.signals:
signal_name = signal.name
# 如果 signal_values 中有这个信号名,使用用户提供的值 # 如果 signal_values 中有这个信号名,使用用户提供的值
if signal_name in signal_values: if signal_name in signal_values:
signal_data[signal_name] = signal_values[signal_name] signal_data[signal_name] = signal_values[signal_name]
else: else:
# 否则设为 0 # 否则设为 0
signal_data[signal_name] = 0 signal_data[signal_name] = 0
# 是否更新用户提供的信号值 # 是否更新用户提供的信号值
if signal_values: if signal_values:
signal_data.update(signal_values) signal_data.update(signal_values)
# 编码信号为 CAN 数据 # 编码信号为 CAN 数据
encoded_data = self.dbc.encode_message(message_name, signal_data) encoded_data = self.dbc.encode_message(message_name, signal_data)
@ -197,7 +217,10 @@ class CanCodecProcessor:
data = bytes.fromhex(data) data = bytes.fromhex(data)
decoded_signals = self.dbc.decode_message(can_id, data) decoded_signals = self.dbc.decode_message(can_id, data)
self._save_decode_config(self.config_dir, can_id, data)
if self.save_config:
self._save_decode_config(self.config_dir, can_id, data)
return decoded_signals return decoded_signals
def return_encoded_info(self, can_message: hex, encoded_data: int, signal_data: str) -> dict: def return_encoded_info(self, can_message: hex, encoded_data: int, signal_data: str) -> dict:
@ -208,4 +231,3 @@ class CanCodecProcessor:
"decimal": [byte for byte in encoded_data], "decimal": [byte for byte in encoded_data],
"can_msg": '[{}]'.format(formatted_data), "can_msg": '[{}]'.format(formatted_data),
} }