158 lines
6.1 KiB
Python
158 lines
6.1 KiB
Python
#!/usr/bin/env python
|
|
# -*-coding:utf-8 -*-
|
|
|
|
'''
|
|
# @Author :幸运锦鲤
|
|
# @Time : 2025-03-26 10:55:41
|
|
# @version : python3
|
|
# @Update time :
|
|
# @Description : 使用 DBC 编解码
|
|
'''
|
|
|
|
from pathlib import Path
|
|
import cantools
|
|
import can
|
|
import binascii
|
|
|
|
|
|
class CANCodec:
|
|
def __init__(self, dbc_file):
|
|
"""初始化 CAN 编解码器"""
|
|
self.db = self._load_dbc(dbc_file)
|
|
|
|
def _load_dbc(self, dbc_file):
|
|
"""加载 DBC 文件,并进行异常处理"""
|
|
if not Path(dbc_file).exists():
|
|
print("[ERROR] DBC 文件不存在: {dbc_file}".format(Path(dbc_file).name))
|
|
raise FileNotFoundError("DBC 文件不存在: {dbc_file}".format(Path(dbc_file).name))
|
|
|
|
try:
|
|
db = cantools.database.load_file(dbc_file)
|
|
print("[INFO] 成功加载 DBC 文件: {}".format(Path(dbc_file).name))
|
|
return db
|
|
except Exception as e:
|
|
print("[ERROR] 加载 DBC 文件失败: {}".format(e))
|
|
raise
|
|
|
|
def encode_message(self, message_name, signal_values=None):
|
|
"""编码信号为 CAN 报文数据"""
|
|
# 获取消息对象
|
|
message = self.db.get_message_by_name(message_name)
|
|
can_id = message.frame_id
|
|
|
|
# 构造信号值
|
|
signal_data = {}
|
|
for signal in message.signals:
|
|
signal_name = signal.name
|
|
|
|
if signal_name in signal_values:
|
|
signal_data[signal_name] = signal_values[signal_name]
|
|
else:
|
|
signal_data[signal_name] = 0
|
|
|
|
# 是否更新用户提供的信号值
|
|
if signal_values:
|
|
signal_data.update(signal_values)
|
|
|
|
# 编码信号为 CAN 数据
|
|
encoded_data = self.db.encode_message(message_name, signal_data)
|
|
|
|
# 创建 CAN 报文对象
|
|
is_extended_id = message.is_extended_frame
|
|
can_message = can.Message(arbitration_id=can_id, data=encoded_data, is_extended_id=is_extended_id)
|
|
|
|
if len(encoded_data) < 8:
|
|
encoded_data += b'\x00' * (8 - len(encoded_data))
|
|
|
|
return can_message, encoded_data, signal_data
|
|
|
|
def decode_message(self, can_id, data):
|
|
"""解码 CAN 报文数据为信号值"""
|
|
if isinstance(data, list):
|
|
data = bytes(data)
|
|
|
|
decoded_signals = self.db.decode_message(can_id, data)
|
|
return decoded_signals
|
|
|
|
def return_encoded_info(self, can_message, encoded_data, signal_data):
|
|
"""返回编码后的 CAN 报文信息"""
|
|
formatted_data = ', '.join([f"0x{byte:02X}" for byte in encoded_data])
|
|
return {
|
|
"can_id": hex(can_message.arbitration_id),
|
|
"decimal": [byte for byte in encoded_data],
|
|
"can_msg": '[{}]'.format(formatted_data),
|
|
}
|
|
|
|
def print_encoded_info(self, can_message, encoded_data, signal_data):
|
|
"""打印编码后的 CAN 报文信息"""
|
|
formatted_data = ', '.join([f"0x{byte:02X}" for byte in encoded_data])
|
|
|
|
print("\nOriginal Signals:")
|
|
for signal_name, signal_value in signal_data.items():
|
|
print(f"Signal Name: {signal_name}")
|
|
print(f" Value (Decimal): {signal_value}")
|
|
if isinstance(signal_value, float):
|
|
print(f" Value (Hex): {hex(int(signal_value))}")
|
|
print(f" Value (Binary): {format(int(signal_value), 'b')}")
|
|
else:
|
|
print(f" Value (Hex): {hex(signal_value)}")
|
|
print(f" Value (Binary): {format(signal_value, 'b')}")
|
|
|
|
print("\nCoded Data:")
|
|
print(f"CAN ID: {hex(can_message.arbitration_id)}")
|
|
print(f"Encoded Data (Hex): {binascii.hexlify(encoded_data).upper().decode('utf-8')}")
|
|
print(f"Encoded Data (Binary): {' '.join(format(byte, '08b') for byte in encoded_data)}")
|
|
print(f"Encoded Data (Decimal): {[byte for byte in encoded_data]}")
|
|
print(f"Encoded Data (CanMsg): [{formatted_data}]")
|
|
|
|
def print_decoded_info(self, can_id, data, decoded_signals):
|
|
"""打印解码后的 CAN 报文信息"""
|
|
print("\nDecoded Data:")
|
|
print(f"CAN ID: {hex(can_id)}")
|
|
if isinstance(data, list):
|
|
data = bytes(data)
|
|
print(f"Raw Data (Hex): {binascii.hexlify(data).upper().decode('utf-8')}")
|
|
print(f"Raw Data (Binary): {' '.join(format(byte, '08b') for byte in data)}")
|
|
print(f"Raw Data (Decimal): {[byte for byte in data]}")
|
|
|
|
print("\nDecoded Signals:")
|
|
for signal_name, signal_value in decoded_signals.items():
|
|
print(f"Signal Name: {signal_name}")
|
|
print(f" Value (Decimal): {signal_value}")
|
|
# 检查信号值是否为浮点数,并转换为整数
|
|
if isinstance(signal_value, float):
|
|
print(f" Value (Hex): {hex(int(signal_value))}")
|
|
print(f" Value (Binary): {format(int(signal_value), 'b')}")
|
|
else:
|
|
print(f" Value (Hex): {hex(signal_value)}")
|
|
print(f" Value (Binary): {format(signal_value, 'b')}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 初始化 CAN 编解码器
|
|
dbc_file = "./can_dbc_related/test.dbc" # 替换为你的 DBC 文件路径
|
|
codec = CANCodec(dbc_file)
|
|
|
|
# 编码示例
|
|
message_name = "navigation" # 替换为目标消息名称
|
|
signal_values = {
|
|
"nav_status": 1,
|
|
"nav_direction": 100,
|
|
# "Var1": 250,
|
|
}
|
|
can_message, encoded_data, signal_data = codec.encode_message(message_name, signal_values)
|
|
print(f"\nSignal Data:{signal_data}")
|
|
codec.print_encoded_info(can_message, encoded_data, signal_data)
|
|
|
|
# 解码示例
|
|
# can_id = 0x701 # 替换为目标 CAN 报文的 ID
|
|
# data = b'\x00\x04\x00\x00\x00\x00\x00\x00' # 替换为目标 CAN 报文的数据
|
|
# decoded_signals = codec.decode_message(can_id, data)
|
|
# codec.print_decoded_info(can_id, data, decoded_signals)
|
|
|
|
# 解码示例(列表十进制解析)
|
|
# can_id = 0x130 # 替换为目标 CAN 报文的 ID
|
|
# data = [0, 4, 0, 0, 0, 0, 0, 0] # 替换为目标 CAN 报文的数据
|
|
# decoded_signals = codec.decode_message(can_id, data)
|
|
# codec.print_decoded_info(can_id, data, decoded_signals)
|