工业汽车嵌入式

智能家居好伙伴 树莓派,MQTT和Python – 下篇

智能家居好伙伴 树莓派,MQTT和Python – 下篇

在本篇文章中,我们主要结合代码来讲解,如何在树莓派上使用Python开发基于MQTT的通讯机制, 项目的整体设计请阅读智能家居好伙伴 树莓派,MQTT和Python – 上篇 本文中提到的代码均在GitHub上共享,大家可以在文章结尾找到GitHub仓库地址。

1. 搭建环境

这个项目需要使用到的Python库包括:click,paho-mqtt,wakeonlan,apschedule,在开发之前,我们创建好虚拟环境,如果读者不知道怎么来创建虚拟环境,可以阅读 数据科学家的一种工作环境 – virtualenv和Jupyter Notebook 文章的第一章节,然后在环境中使用pip安装这些库。我们来说明一下这些库的作用:

  • click - 可以帮助开发人员快速的创建命令行应用,因为我们的项目不需要有GUI的界面,是一个通过命令启动的服务,因此,使用这个库,可以帮助我们去处理命令行参数。
  • paho-mqtt - 我们用到的核心库,用于实现MQTT客户端,连接到MQTT Broker发送和接收话题消息。
  • wakeonlan - 可以在局域网内向某台电脑发送唤醒请求,用于唤醒服务器。
  • apschedule - 可以创建定时任务,我们需要设备周期性的上报传感器采集到的数据,可以使用这个库来定时调用数据上报函数来实现。

环境搭建好以后就可以开始编码了。

2. 开发代码

2.1 采集温湿度

我们使用DHT11传感器采集室内的温湿度,这个传感器使用一个GPIO管脚来接收主控芯片发送过来的温湿度读取请求,并且在同一个管脚上回复温湿度数据,通过控制GPIO管脚上的高电平的长短来表示0和1。因此我们的主控芯片(也就是树莓派)需要能够发送请求,同时正确解码温湿度读取。

我们代码仓库里边提供提供了dht11.c文件,里边包含了这部分功能,是C代码实现,在这里就不过多的讲解了,使用makefile可以直接编译为dht11可执行文件。

这个程序从GPIO管脚获取到温湿度数据以后,会打印到控制台,打印格式为:

1
Humidity = 49.1 % | Temperature = 25.6 C

DHT11在数据中提供了校验,如果校验不通过,则dht11程序会在控制台打印:Data not good, skip

有了dht11读取程序之后,我们会在Python主程序中调用这个外部程序,并获取这个外部程序在控制台的输出,当有正确的温湿度格式数据出现在控制台以后,就将其存储下来,以便后续发送。

代码最终实现为server.py文件中的一个名为DHT11Server的类,如下所示,类方法read_data就是使用dht11来读取温湿度,并保存到类属性中,periodic_report方法用于周期性上报数据,这两个方法需要添加到apschedule的调度中,而report_temperature和report_humidity则用于响应主从模式下从其他客户端发来的数据请求。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class DHT11Server(ServerBase):

    def __init__(self, name) -> None:
        super().__init__(name=name)
        self._humidity = 0
        self._temperature = 0
    
    @property
    def humidity(self):
        return self._humidity

    @property
    def temperature(self):
        return self._temperature
    
    def read_data(self):
        if platform.system() != 'Windows':
            process = Popen(["./dht11"], stdout=PIPE)
            (output, err) = process.communicate()
            exit_code = process.wait()

            lines = output.decode("utf-8").split('\n')

            for line in lines:
                if "=" in line:
                    data = line.split('|')
                    self._humidity = data[0].split('=')[-1].strip().split(" ")[0]
                    self._temperature = data[1].split('=')[-1].strip().split(" ")[0]
        else:
            self._humidity = 88
            self._temperature = 66
    
    def report_temperature(self, *args):
        q_msg = (self._topic_response, json.dumps({"temperature": self.temperature}))
        ReportQ.put(q_msg)

    def report_humidity(self, *args):
        q_msg = (self._topic_response, json.dumps({"humidity": self.humidity}))
        ReportQ.put(q_msg)

    def periodic_report(self):
        topic = self._topic_broadcast_prefix + '/humidity'
        q_msg = (topic, json.dumps({"value": self.humidity}))
        ReportQ.put(q_msg)
        topic = self._topic_broadcast_prefix + '/temperature'
        q_msg = (topic, json.dumps({"value": self.temperature}))
        ReportQ.put(q_msg)

服务器唤醒也是使用相似的类实现,类名称为LinuxServer,它和DHT11Server一样都继承与ServerBase,我们在未来增加传感器和功能的时候,也仅仅需要创建一个新的ServerBase子类就行。

你可能已经注意到了,在这个类中的数据上报方法中我们使用了一个叫ReportQ的对象,用于存储想要使用MQTT发送的信息(包括话题和消息内容),我们将在下一小节讲解这一部分设计。

2.2 MQTT通信机制

我们使用了Python内建的Queue来实现MQTT的异步通信机制,这个也就是上边提到的ReportQ对象,ReportQ是一个Queue,也就是FIFO,所有需要被发送的消息都将被存储到这个数据结构里边,我们将使用另外一个线程来监控ReportQ里边是不是有待发送的消息,如果有,就会把他们发出去。

考虑到我们的数据量很小,同时为了简化代码,我们认为数据只要被放置到ReportQ中会被马上发出去(延迟很小,或者延迟可以接受),我们通过控制前边提到的periodic_report方法的调用周期,来控制消息被放置到ReportQ中的周期,从而实现定期发送MQTT数据。

MQTT的通讯在comm.py文件中实现,其中的Session类就代表我们设计的MQTT通讯机制。在_\__init__中,我们使用paho-mqtt中的mqtt.Client()来创建MQTT客户端,同时绑定了两个方法到这个客户端:

  • on_connect - 当客户端成功连接到Broker的时候,这个方法会被调用,一般来讲这个方法被调用预示着MQTT通讯通道打通,所以,我们会在这个方法中订阅我们关注的话题,以便后续接收
  • on_message - 当客户端收到由Broker分发过来的(前提是已经订阅过的话题)消息时会被调用,在我们这个项目中,主要用于支持主从模式,因此在这个方法中,我们主要执行的动作就是调用信息处理回调函数
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Session(object):

    def __init__(self, func=None) -> None:
        self._func = func

        self.client = mqtt.Client()
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

        self.client.connect(config['MQTT_BROKER']['HOST'], config['MQTT_BROKER']['PORT'], 60)

        self.report_thread_running = True
        self.report_thread = threading.Thread(target=self.report_status)
        self.report_thread.start()
    
    def report_status(self):
        while self.report_thread_running:
            if not ReportQ.empty():
                msg = ReportQ.get()
                # send msg with mqtt
                self.client.publish(topic=msg[0],payload=msg[1])

    def on_connect(self, client, userdata, flags, rc):

        HomelabLogger.info("Connected with result code "+str(rc))
        client.subscribe("home/+/command")

    def on_message(self, client, userdata, msg):
        message = dict({'topic': msg.topic, 'payload': msg.payload })
        if self._func is not None:
            self._func(message)
        else:
            HomelabLogger.info(msg.topic+" "+str(msg.payload))

    def loop_forever(self):
        self.client.loop_forever()
    
    def shutdown(self):
        self.report_thread_running = False

ReportQ = queue.Queue()

Session类中的方法report_status是实现发送MQTT消息的核心,如前边所说它在另外的一个线程中运行,并不停的监控ReportQ中是否有待发送的消息,一旦有待发送消息,它就会将其发送出去。

2.3 主程序

有了Server和Session以后,主程序就显得相当简单了,我们使用@click.command()来指定我们的main函数为一个命令行程序(更多click用法,可以参考其文档)。在main函数中,我们创建了两个Server,一个是我们的Linux服务器,主要支持wakeonlan唤醒动作,还有一个是DHT11Server用于采集室内温湿度,每次使用Server类来创建对象的时候需要制定一个名称,而这个名称则用在MQTT的话题(Topic)中被用到,用于标识我们数据的来源和对象。

后边的message_handler函数我们要稍微讲解一下,这个函数最终会在Session创建的时候传递给Session对象,前边我们提到在主从模式下收到MQTT消息以后会调用一个回调函数,这个message_handler就是我们的回调函数,它的主要逻辑是检查收到的消息是不是匹配某一个server,如果匹配的话,查询这个server是否支持消息中携带的动作,如果支持的话,就执行该动作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@click.command()
def main():

    nextcloud_server = LinuxServer('nextcloud-server', 'AB:CE:FE:1C:BD:3D', '10.0.0.108')
    pi0_dht11_server = DHT11Server('pi-zero-dht11')

    servers = []
    servers.append(nextcloud_server)
    servers.append(pi0_dht11_server)

    def message_handler(msg):
        HomelabLogger.info(msg)
        try:
            topic_parts = msg['topic'].split('/')
            target_server = topic_parts[1] 
            msg_type = topic_parts[2] 
            payload = msg['payload']
            for server in servers:
                if server.name == target_server and msg_type == 'command':
                    # decode message body
                    command = json.loads(payload)

                    if hasattr(server, command['name']):
                        fun = getattr(server, command['name'])
                        if 'time' not in command.keys():
                            # run immediately
                            fun(*command['parameter'])
                        else:
                            # use scheduler to schedule a task
                            pass
                    # execute and break
                    break
        except Exception as e:
            HomelabLogger.error('!!!!!! error happend for message:')
            HomelabLogger.error(e)

    scheduler = BackgroundScheduler()
    scheduler.add_job(pi0_dht11_server.read_data, 'interval', [], minutes=1)
    scheduler.add_job(pi0_dht11_server.periodic_report, 'interval', [], minutes=1)
    scheduler.start()

    session = Session(func=message_handler)

    try:
        session.loop_forever()
    except KeyboardInterrupt:
        session.shutdown()

后边我们使用apschedule中最基础的BackgroundScheduler类来创建一个调度器,这个然后使用这个调度器的interval方式来调度任务,这个调度方式就是可以让用户来设置一个调用某个函数的周期,我们设置每间隔1分钟调用DHT11Server来读取温湿度,并且每个一分钟将DHT11Server中存储的温湿度发送出去。

3. 部署

完成编码以后,我们要部署到树莓派上运行,我们希望每次重启,或者开关树莓派的电源,我们的服务都能运行起来,我们选择使用crontab来启动服务,通过调用crontab -e命令来编译crontab,在其中添加下边这一行配置就可以完成部署了,其中/home/pi/smart-home-pi使我们在树莓派上放置源码的目录。

1
@reboot cd /home/pi/smart-home-pi && python3 main.py

4. 总结

我们在设计代码的时候考虑到了其扩展性,如果想要扩展更多的设备只需要创建一个ServerBase的子类并实现就可以了,因此可以很容易的添加更多新功能,比如我们又添加了空气质量检测(二氧化碳浓度ppm,和TVOC ppb),以及显示屏,最终效果像下边图片这样。本项目使用的代码放置在GTIHub仓库:https://github.com/pythonlibrary/smart-home-pi

关于 Python酷

Python之所以如此流行,在于它有强大的生态,使用各种各种的库可以帮助用户最快速的解决问题。Python酷致力于输出高质量的Python库相关教程及技术性文章,帮助用户更好更快速的解决问题