在本篇文章中,我们主要结合代码来讲解,如何在树莓派上使用Python开发基于MQTT的通讯机制,
项目的整体设计请阅读智能家居好伙伴 树莓派,MQTT和Python – 上篇
本文中提到的代码均在GitHub上共享,大家可以在文章结尾找到GitHub仓库地址。
1. 搭建环境
这个项目需要使用到的Python库包括:click,paho-mqtt,wakeonlan,apschedule,在开发之前,我们创建好虚拟环境,如果读者不知道怎么来创建虚拟环境,可以阅读 数据科学家的一种工作环境 – virtualenv和Jupyter Notebook 文章的第一章节,然后在环境中使用pip安装这些库。我们来说明一下这些库的作用:
- click - 可以帮助开发人员快速的创建命令行应用,因为我们的项目不需要有GUI的界面,是一个通过命令启动的服务,因此,使用这个库,可以帮助我们去处理命令行参数。
- paho-mqtt - 我们用到的核心库,用于实现MQTT客户端,连接到MQTT Broker发送和接收话题消息。
- wakeonlan - 可以在局域网内向某台电脑发送唤醒请求,用于唤醒服务器。
- apschedule - 可以创建定时任务,我们需要设备周期性的上报传感器采集到的数据,可以使用这个库来定时调用数据上报函数来实现。
环境搭建好以后就可以开始编码了。
2. 开发代码
2.1 采集温湿度
我们使用DHT11传感器采集室内的温湿度,这个传感器使用一个GPIO管脚来接收主控芯片发送过来的温湿度读取请求,并且在同一个管脚上回复温湿度数据,通过控制GPIO管脚上的高电平的长短来表示0和1。因此我们的主控芯片(也就是树莓派)需要能够发送请求,同时正确解码温湿度读取。
我们代码仓库里边提供提供了dht11.c文件,里边包含了这部分功能,是C代码实现,在这里就不过多的讲解了,使用makefile可以直接编译为dht11可执行文件。
这个程序从GPIO管脚获取到温湿度数据以后,会打印到控制台,打印格式为:
1
|
Humidity = 49.1 % | Temperature = 25.6 C
|
DHT11在数据中提供了校验,如果校验不通过,则dht11程序会在控制台打印:Data not good, skip
有了dht11读取程序之后,我们会在Python主程序中调用这个外部程序,并获取这个外部程序在控制台的输出,当有正确的温湿度格式数据出现在控制台以后,就将其存储下来,以便后续发送。
代码最终实现为server.py文件中的一个名为DHT11Server的类,如下所示,类方法read_data就是使用dht11来读取温湿度,并保存到类属性中,periodic_report方法用于周期性上报数据,这两个方法需要添加到apschedule的调度中,而report_temperature和report_humidity则用于响应主从模式下从其他客户端发来的数据请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
class DHT11Server(ServerBase):
def __init__(self, name) -> None:
super().__init__(name=name)
self._humidity = 0
self._temperature = 0
@property
def humidity(self):
return self._humidity
@property
def temperature(self):
return self._temperature
def read_data(self):
if platform.system() != 'Windows':
process = Popen(["./dht11"], stdout=PIPE)
(output, err) = process.communicate()
exit_code = process.wait()
lines = output.decode("utf-8").split('\n')
for line in lines:
if "=" in line:
data = line.split('|')
self._humidity = data[0].split('=')[-1].strip().split(" ")[0]
self._temperature = data[1].split('=')[-1].strip().split(" ")[0]
else:
self._humidity = 88
self._temperature = 66
def report_temperature(self, *args):
q_msg = (self._topic_response, json.dumps({"temperature": self.temperature}))
ReportQ.put(q_msg)
def report_humidity(self, *args):
q_msg = (self._topic_response, json.dumps({"humidity": self.humidity}))
ReportQ.put(q_msg)
def periodic_report(self):
topic = self._topic_broadcast_prefix + '/humidity'
q_msg = (topic, json.dumps({"value": self.humidity}))
ReportQ.put(q_msg)
topic = self._topic_broadcast_prefix + '/temperature'
q_msg = (topic, json.dumps({"value": self.temperature}))
ReportQ.put(q_msg)
|
服务器唤醒也是使用相似的类实现,类名称为LinuxServer,它和DHT11Server一样都继承与ServerBase,我们在未来增加传感器和功能的时候,也仅仅需要创建一个新的ServerBase子类就行。
你可能已经注意到了,在这个类中的数据上报方法中我们使用了一个叫ReportQ的对象,用于存储想要使用MQTT发送的信息(包括话题和消息内容),我们将在下一小节讲解这一部分设计。
2.2 MQTT通信机制
我们使用了Python内建的Queue来实现MQTT的异步通信机制,这个也就是上边提到的ReportQ对象,ReportQ是一个Queue,也就是FIFO,所有需要被发送的消息都将被存储到这个数据结构里边,我们将使用另外一个线程来监控ReportQ里边是不是有待发送的消息,如果有,就会把他们发出去。
考虑到我们的数据量很小,同时为了简化代码,我们认为数据只要被放置到ReportQ中会被马上发出去(延迟很小,或者延迟可以接受),我们通过控制前边提到的periodic_report方法的调用周期,来控制消息被放置到ReportQ中的周期,从而实现定期发送MQTT数据。
MQTT的通讯在comm.py文件中实现,其中的Session类就代表我们设计的MQTT通讯机制。在_\__init__中,我们使用paho-mqtt中的mqtt.Client()来创建MQTT客户端,同时绑定了两个方法到这个客户端:
- on_connect - 当客户端成功连接到Broker的时候,这个方法会被调用,一般来讲这个方法被调用预示着MQTT通讯通道打通,所以,我们会在这个方法中订阅我们关注的话题,以便后续接收
- on_message - 当客户端收到由Broker分发过来的(前提是已经订阅过的话题)消息时会被调用,在我们这个项目中,主要用于支持主从模式,因此在这个方法中,我们主要执行的动作就是调用信息处理回调函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
class Session(object):
def __init__(self, func=None) -> None:
self._func = func
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(config['MQTT_BROKER']['HOST'], config['MQTT_BROKER']['PORT'], 60)
self.report_thread_running = True
self.report_thread = threading.Thread(target=self.report_status)
self.report_thread.start()
def report_status(self):
while self.report_thread_running:
if not ReportQ.empty():
msg = ReportQ.get()
# send msg with mqtt
self.client.publish(topic=msg[0],payload=msg[1])
def on_connect(self, client, userdata, flags, rc):
HomelabLogger.info("Connected with result code "+str(rc))
client.subscribe("home/+/command")
def on_message(self, client, userdata, msg):
message = dict({'topic': msg.topic, 'payload': msg.payload })
if self._func is not None:
self._func(message)
else:
HomelabLogger.info(msg.topic+" "+str(msg.payload))
def loop_forever(self):
self.client.loop_forever()
def shutdown(self):
self.report_thread_running = False
ReportQ = queue.Queue()
|
Session类中的方法report_status是实现发送MQTT消息的核心,如前边所说它在另外的一个线程中运行,并不停的监控ReportQ中是否有待发送的消息,一旦有待发送消息,它就会将其发送出去。
2.3 主程序
有了Server和Session以后,主程序就显得相当简单了,我们使用@click.command()来指定我们的main函数为一个命令行程序(更多click用法,可以参考其文档)。在main函数中,我们创建了两个Server,一个是我们的Linux服务器,主要支持wakeonlan唤醒动作,还有一个是DHT11Server用于采集室内温湿度,每次使用Server类来创建对象的时候需要制定一个名称,而这个名称则用在MQTT的话题(Topic)中被用到,用于标识我们数据的来源和对象。
后边的message_handler函数我们要稍微讲解一下,这个函数最终会在Session创建的时候传递给Session对象,前边我们提到在主从模式下收到MQTT消息以后会调用一个回调函数,这个message_handler就是我们的回调函数,它的主要逻辑是检查收到的消息是不是匹配某一个server,如果匹配的话,查询这个server是否支持消息中携带的动作,如果支持的话,就执行该动作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
@click.command()
def main():
nextcloud_server = LinuxServer('nextcloud-server', 'AB:CE:FE:1C:BD:3D', '10.0.0.108')
pi0_dht11_server = DHT11Server('pi-zero-dht11')
servers = []
servers.append(nextcloud_server)
servers.append(pi0_dht11_server)
def message_handler(msg):
HomelabLogger.info(msg)
try:
topic_parts = msg['topic'].split('/')
target_server = topic_parts[1]
msg_type = topic_parts[2]
payload = msg['payload']
for server in servers:
if server.name == target_server and msg_type == 'command':
# decode message body
command = json.loads(payload)
if hasattr(server, command['name']):
fun = getattr(server, command['name'])
if 'time' not in command.keys():
# run immediately
fun(*command['parameter'])
else:
# use scheduler to schedule a task
pass
# execute and break
break
except Exception as e:
HomelabLogger.error('!!!!!! error happend for message:')
HomelabLogger.error(e)
scheduler = BackgroundScheduler()
scheduler.add_job(pi0_dht11_server.read_data, 'interval', [], minutes=1)
scheduler.add_job(pi0_dht11_server.periodic_report, 'interval', [], minutes=1)
scheduler.start()
session = Session(func=message_handler)
try:
session.loop_forever()
except KeyboardInterrupt:
session.shutdown()
|
后边我们使用apschedule中最基础的BackgroundScheduler类来创建一个调度器,这个然后使用这个调度器的interval方式来调度任务,这个调度方式就是可以让用户来设置一个调用某个函数的周期,我们设置每间隔1分钟调用DHT11Server来读取温湿度,并且每个一分钟将DHT11Server中存储的温湿度发送出去。
3. 部署
完成编码以后,我们要部署到树莓派上运行,我们希望每次重启,或者开关树莓派的电源,我们的服务都能运行起来,我们选择使用crontab来启动服务,通过调用crontab -e命令来编译crontab,在其中添加下边这一行配置就可以完成部署了,其中/home/pi/smart-home-pi使我们在树莓派上放置源码的目录。
1
|
@reboot cd /home/pi/smart-home-pi && python3 main.py
|
4. 总结
我们在设计代码的时候考虑到了其扩展性,如果想要扩展更多的设备只需要创建一个ServerBase的子类并实现就可以了,因此可以很容易的添加更多新功能,比如我们又添加了空气质量检测(二氧化碳浓度ppm,和TVOC ppb),以及显示屏,最终效果像下边图片这样。本项目使用的代码放置在GTIHub仓库:https://github.com/pythonlibrary/smart-home-pi