用树莓派、PyPortal Titano和machinechat JEDI One设置和测试MQTT broker服务器

描述

该项目使用machinechat的JEDI One物联网数据管理软件在树莓派4上设置了一个物联网MQTT broker 服务器。JEDI One包括一个数据收集器,可以将其配置为MQTT broker 服务器,外部客户机设备可以向其发布和(或)订阅JEDI One上的主题。为了测试MQTT broker 服务器,将Adafruit PyPortal Titano设置为客户端设备,该设备通过WiFi订阅和发布树莓派上的MQTT broker 服务器。

image

背景

MQTT(消息队列遥测传输)是一个开放的OASIS和ISO标准,它定义了在设备之间传输消息的轻量级发布-订阅网络协议。它包括两种类型的网络实体:一个消息代理和一些客户端。machinechat的JEDI One物联网平台包括一个数据收集器,可以配置为MQTT消息代理。外部客户机设备可以配置为在JEDI One上发布数据或订阅主题(主题包括从任何来源进入JEDI One的所有数据,而不仅仅是MQTT)。JEDI One要求发布消息采用JSON有效负载格式,并以JSON有效负载格式提供订阅消息。

硬件

软件

  • JEDI One
    是一款即用型物联网数据管理软件解决方案。功能包括:收集来自传感器、设备和机器的数据;构建直观的实时和历史数据以及系统视图仪表板;创建规则,自动监控和响应数据情况;通过电子邮件和短信接收警报通知。
  • CircuitPython
    是Adafruit的MicroPython分支,旨在简化低成本微控制器的实验和教育。不需要编译器、链接器或IDE。

实现

对于该项目,JEDI One应用程序先前已安装在树莓派上,并设置了HTTP数据收集器并接收外部传感器数据。然后设置JEDI One MQTT数据收集器。为了测试系统,将PyPortal Titano配置为订阅和发布MQTT broker 服务器上的主题。CircuitPython用于在PyPortal上实现应用程序代码。

设置JEDI One MQTT broker 服务器

1 - 如果machinechat JEDI One尚未安装在树莓派上,请参见以下内容:

2 - 配置MQTT broker 服务器

在JEDI One“数据收集器”选项卡中,选择“添加收集器”并进行配置。命名“数据收集器”,并为“收集器类型”选择“MQTT Broker”。MQTT采集器设置截图中的“监听IP”是JEDI One树莓派的IP地址,“监听端口”是1883。(注:以下是未加密配置的示例,但可以按照machinechat产品指南-如何生成TLS证书和密钥中所示的方式配置TLS加密)

设置PyPortal CircuitPython MQTT客户端测试应用程序
Pyportal测试应用程序有三部分:

  1. MQTT客户端,订阅正在JEDI One上收集的现有传感器数据

  1. 将传感器数据发布到JEDI One的MQTT客户端

image

  1. 显示应用程序,将订阅的传感器数据打印到PyPortal Titano显示器

image

1 - 在PyPortal Titano上设置CircuitPython。参见链接CircuitPython | Adafruit PyPortal Titano | Adafruit Learning System
(注意:本项目使用CircuitPython 6.3.0)

2 - 安装应用程序所需的库:

检查以下库是否安装在CIRCUITPY (D:\lib)中。所需的库可从CircuitPython库下载。(注:本项目使用adafruit-circuitpython-bundle-6.x-mpy-20210618.zip)

import neopixel
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
from adafruit_pyportal import PyPortal
from adafruit_bitmap_font import bitmap_font
from adafruit_display_text import label

3 - 代码演练(文件名:mqtt_jedi_display.py)

初始设置代码:

import time
import board
import busio
from digitalio import DigitalInOut
import neopixel
from adafruit_esp32spi import adafruit_esp32spi
from adafruit_esp32spi import adafruit_esp32spi_wifimanager
import adafruit_esp32spi.adafruit_esp32spi_socket as socket
import adafruit_minimqtt.adafruit_minimqtt as MQTT
import json
import sys
from adafruit_pyportal import PyPortal
import displayio
import terminalio
from adafruit_bitmap_font import bitmap_font
from adafruit_display_text import label

unique_id = "ppt10:52:1C:88:B6:F8" # set unique id based on pyportal MAC
latest_msg = "test"                # set initial message

display = board.DISPLAY
main_group = displayio.Group(max_size=10)
MEDIUM_FONT = bitmap_font.load_font("fonts/Arial-16.bdf")
BIG_FONT = bitmap_font.load_font("fonts/Arial-Bold-24.bdf")

### WiFi ###

# Get wifi details and more from a secrets.py file
try:
    from secrets import secrets
except ImportError:
    print("WiFi secrets are kept in secrets.py, please add them there!")
    raise

# If you are using a board with pre-defined ESP32 Pins:
esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)

# If you have an externally connected ESP32:
# esp32_cs = DigitalInOut(board.D9)
# esp32_ready = DigitalInOut(board.D10)
# esp32_reset = DigitalInOut(board.D5)

spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
"""Use below for Most Boards"""
status_light = neopixel.NeoPixel(
    board.NEOPIXEL, 1, brightness=0.2
)  # Uncomment for Most Boards

wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)

设置MQTT发布和订阅提要,打印到PyPortal Titano显示

### Feeds ###

# Setup a feed named 'photocell' for publishing to a feed
photocell_feed = "datacache/photocell"  # modify feed for JEDI One MQTT broker


# Setup a feed named 'pump' for subscribing to changes
pump_feed = "datacache/T960981B2D"     # map feed to JEDI One MQTT broker data stream


### Code ###

# Define callback methods which are called when events occur
# pylint: disable=unused-argument, redefined-outer-name
def connected(client, userdata, flags, rc):
    # This function will be called when the client is connected
    # successfully to the broker.
    print("Connected to JEDI One MQTT broker ! Listening for topic changes on %s" % pump_feed)
    # Subscribe to all changes on the pump_feed.
    client.subscribe(pump_feed)


def disconnected(client, userdata, rc):
    # This method is called when the client is disconnected
    print("Disconnected from JEDI One!")


def message(client, topic, message):
    # This method is called when a topic the client is subscribed to
    # has a new message.
    global latest_msg
    print("New message on topic {0}: {1}".format(topic, message))
    latest_msg = message
    parsed = json.loads(latest_msg)
    data1 = "Humidity " + str(parsed["data1"])
    print(data1)
    print(parsed["data2"])
    print(parsed["data3"])
    print(parsed["timestamp"])
    print(latest_msg)


# Connect to WiFi
print("Connecting to WiFi...")
wifi.connect()
print("Connected!")


# Initialize MQTT interface with the esp interface
MQTT.set_socket(socket, esp)


# Set up a MiniMQTT Client with JEDI One MQTT broker
#  set client_id to unique id for connecting to mqtt broker
mqtt_client = MQTT.MQTT(
    broker="192.168.1.7",
    port = 1883,
    client_id = unique_id,
)

# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message

# Connect the client to the MQTT broker.
print("Connecting to JEDI One MQTT broker...")
mqtt_client.connect()

# print header message on display
text_area1 = label.Label(BIG_FONT, text="Pump House Monitor", max_glyphs=40)
text_area1.x = 10
text_area1.y = 10
main_group.append(text_area1)
display.show(main_group)


data1 = "1234567"  #set default display message
text_area = label.Label(MEDIUM_FONT, text=data1, max_glyphs=40)
text_area.x = 10
text_area.y = 100
main_group.append(text_area)
display.show(main_group)

text_area2 = label.Label(MEDIUM_FONT, text="abcde", max_glyphs=40)
text_area2.x = 10
text_area2.y = 200
main_group.append(text_area2)
display.show(main_group)

# photocell is a simulated sensor used by the pyportal to test publishing to JEDI One MQTT broker
photocell_val = 0  # set initial photocell value

# map photocell data dictionary object in prep to be compatible with JEDI One mqtt broker
photocell_dict = {}
photocell_dict["deviceType"] = "photocell"
photocell_dict["value"] = photocell_val
print(photocell_dict)

while True:
    # Poll the message queue
    mqtt_client.loop()
    if latest_msg != "test":  # check to see if new mqtt subscribe message
        print("received mqtt subscribe message")
        parsed = json.loads(latest_msg)
        data1 = "Humidity " + str(parsed["data1"]) + " %"
        data2 = "Temperature " + str(parsed["data2"]) + " F"
        print(data1)
        text_area.text = data1   # update humidity value for display
        text_area2.text = data2  # update temperature value for display
        latest_msg = "test"  # reset latest message

    photocell_dict["value"] = photocell_val       # update to latest photocell value
    photocell_json = json.dumps(photocell_dict)   # convert photocell data to json format for JEDI One mqtt broker
    print(photocell_json)
    # Send a new message
    print("Sending photocell json string: " + photocell_json)
    mqtt_client.publish(photocell_feed, photocell_json)
    print("Sent!")
    photocell_val += 1
    if photocell_val > 100: # reset counter for simulated sensor data
        photocell_val = 0
    print(latest_msg)
    print(data1)
    time.sleep(11)

MQTT客户端测试应用程序的最新源代码在github上,链接如下:

结论

machine echat的JEDI One数据管理软件和树莓派的结合形成了一个独立的、低成本的、易于使用的物联网MQTT broker 服务器平台。可以很容易地将客户机设备配置为订阅和/或发布到MQTT broker 服务器平台上的主题,而不需要第三方云服务或互联网连接。

参考文献