工业汽车嵌入式

听懂汽车的语言 – 使用Python控制CAN总线

听懂汽车的语言 – 使用Python控制CAN总线

CAN总线是在汽车和工业领域广泛应用的一种通讯方式,电脑上并没有配备相应的物理硬件,市面上有很多公司提供不同的将CAN转为USB的设备,使用这些设备电脑就能够借由USB通过CAN总线跟汽车或者其他工业设备进行交流,通常来说,这些公司都会针对其对应的硬件设备提供相应的软件,不过因为其专业性,要不软件的价格比较贵的,要不免费的软件功能有限。

虽然是一个比较冷门的应用场景,但使用Python确实也是可以使用这些设备来进行CAN总线通讯的,这都得益于丰富的Python生态,在这一篇文章中,我们将介绍一个能够支持市面上常用的CAN设备的Python库: python-can,这个库做的非常好,一方面可以支持多种不同的CAN设备(像 Vector, Kvaser, PCAN 等等),另一方面对总线的抽象也很好,同时提供了内置的log记录器,可以通过几行简单的代码将总线数据保存为常见的asc,csv,blf,甚至是数据库格式。

本文不会介绍跑在CAN上边的各种应用层协议,而会以一个最小可运行项目的方式,介绍如何使用这个库进行最基本的消息收发,而在此之上,读者如果有相应的 需求(自动化测试,诊断工具等等),可以根据不同的应用层协议实现自己的应用。

文章中的代码可以在GitHub仓库 https://github.com/pythonlibrary/talk-to-your-car 中找到。

1. 示例代码功能

本文使用的示例代码将实现这样一个功能目标:

  • 使用Intrepid的 ValueCAN3 作为硬件,硬件的图片见下图
  • 以脚本的方式运行,没有UI
  • 当脚本运行起来以后,使用ValueCAN3的通道1定期的向总线上广播ID为0x7E0的消息,而数据是随机生成的,同时也会监听这个通道上收到的任何消息,并以asc格式保存到名为logfile.asc的日志文件中,而在保存文件的同时,脚本会在终端窗口打印出当前收到的消息。

另外,python-can这个库还提供了一个虚拟总线环境,让你可以在不使用任何硬件的情况下,模拟总线的收发,在我们的代码中,也提供了虚拟总线的版本,实现的功能跟上边写的是一样的,只是,不需要使用任何硬件,如果读者手边没有这样的硬件,也可以通过虚拟的方式学习这个库的使用。

仓库中有三个文件:

  • neovi_bus.py - 使用ValueCAN3实现
  • virtual_bus.py - 使用虚拟总线实现
  • logfile.asc - 日志文件的示例

2. 安装库和依赖

首先我们使用pip安装python-can和ValueCAN3硬件相关的依赖,包括:驱动和python-ics(这是ValueCAN3官方的python wrapper)

1
pip install python-can python-ics

针对ValueCAN3,python-can主要要使用到其驱动中的 icsneo40.dll 这个dll文件,在驱动安装中会默认安装到系统目录,如果不安装,则需要手动将这个文件放到我们的工作目录。

针对其他的CAN设备(Vector,PCAN或其他),可以在文档中照当对应的安装方法。 这个链接中提供了所有python-can支持的硬件的安装说明 https://python-can.readthedocs.io/en/master/interfaces.html

安装完毕后,我们用下边这段代码来测试一下是否正常工作

1
2
3
import can

bus = can.interface.Bus(bustype='neovi', channel='1', bitrate=500000)

如果没有任何异常抛出,那么证明一切正常,但是因为要跟硬件交互,python-can需要调用dll或驱动(一般是c/c++实现),出现异常也是挺常见的,最常见到的就是:

1
ics.RuntimeError: Error: find_devices(): Failed to open library: 'icsneo40.dll' with error code: #193

这是一个很典型的在python中使用dll会出现的问题, Intrepid 官方提供的驱动是32 bit的,而如果你使用了64 bit的python就会报这个错误,所以你能做的就是使用32 bit的python。


3. 虚拟总线

利用虚拟总线可以在没有硬件的情况下实现跟底层无关的代码,不需要使用任何驱动和dll,只需要使用python-can就可以完成。

需要注意的是在总线上有收发节点,不同的节点要连接在相同的虚拟总线上,这些节点必须在同一个python应用里边实现,也就是说如果,你写了两个py脚本,一个发送,一个接收,这两个脚本分别运行,他们之间的虚拟总线是没有连接的,所以是不能互相通讯的。

在我们的virtual_bus.py代码中

  1. 我们创建了一个发送(bus_tx),一个接收(bus_rx)两个节点,并都连接到virtual_ch上
  2. 我们创建了一个名为logger的日志记录器,该日志记录器会将受到的信息写入到logfile.asc文件,记录器会根据文件的后缀来决定写入的格式,因此,它将会生成一个asc格式的日志,如果这里后缀为csv,blf或者其它,则会生成相应格式的文件。
  3. 日志记录器logger被放置到了一个listeners的列表中,里边还包含一个print_message的回调函数,这个是我们自己实现的函数,只是简单的将输入参数msg打印出来。
  4. listeners列表通过Notifier绑定到接收节点(bus_rx),这样只要有新的消息在接收节点上收到,这两个listener就会被执行,print_message会将消息打印到终端,而logger会将消息写入到logfile.asc文件。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if __name__ == "__main__":
    # RX part
    bus_rx = can.interface.Bus('virtual_ch', bustype='virtual')
    logger = can.Logger("logfile.asc")  # save log to asc file
    listeners = [
        print_message,  # Callback function, print the received messages
        logger,  # save received messages to asc file
    ]
    notifier = can.Notifier(bus_rx, listeners)

    # TX part
    bus_tx = can.interface.Bus('virtual_ch', bustype='virtual')
    tx_service = tx_thread_cl(bus_tx)
    tx_service.start()

    running = True
    while running:
        input()
        running = False
    
    while not tx_service.finished:
        tx_service.stop()

    # It's important to stop the notifier in order to finish the writting of asc file
    notifier.stop()
    # stops the bus
    bus_tx.shutdown()
    bus_rx.shutdown()
  1. 为了实现周期发送消息,我们自己定义了一个线程,在线程中,会每隔0.5s发送CANID为0x7E0,数据为随机的CAN消息到总线上 。
  2. 使用can.Message来构建CAN消息对象,而使用bus.send就可以将消息发送出去。
  3. 脚本会监控键盘按键,一旦有任何按键被按下,脚本则会进入退出程序,在退出部分一定要调用notifier.stop()来停止 notifier ,只有这样才能确保logger能够完整正确的写入日志,如果不显示的调用这个函数来停止,则日志内容无法成功写入。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class tx_thread_cl:

    def __init__(self, bus):
        self.bus = bus
        self.running = True
        self.thread = threading.Thread(target=self.tx_callback, args=(self.bus,))
        self.finished = False
    
    def start(self):
        self.thread.start()

    def tx_callback(self, bus):
        while self.running:
            data = [random.randint(0,15) for i in range(0,8)]
            msg = can.Message(is_extended_id=False, arbitration_id=0x7E0, data=data)
            bus.send(msg)
            time.sleep(0.5)
        self.finished = True
    
    def stop(self):
        self.running = False

这就是全部的代码,很简单对吧,接下来我们简单的调整一下,使用实际的ValueCAN3 硬件来通讯。

4. 真实的CAN总线

像我们之前提到的虚拟总线可以让你不适用真实CAN硬件的情况下完成应用部分,因此如果将虚拟总线换成真实的CAN总线也相当的容易,仅仅需要实例化真实的bus对象,并用其替代虚拟总线。

参考neovi_bus.py中的更改,将bus_rx和bus_tx替换为统一的 bus(即接收也发送),bus则通过下边这样的方式实例化。

1
bus = can.interface.Bus(bustype='neovi', channel='1', bitrate=500000)

bustype指定为neovi,即 Intrepid 公司CAN硬件的统称,channel指定通道为1通道,bitrate指定波特率。

5. 运行测试

如我们所说,脚本会在终端中打印收到的CAN数据,并存储到logfile.asc文件中。

关于 Python酷

Python之所以如此流行,在于它有强大的生态,使用各种各种的库可以帮助用户最快速的解决问题。Python酷致力于输出高质量的Python库相关教程及技术性文章,帮助用户更好更快速的解决问题